This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5bf6f7892a [Improve][Zeta] Add retry when request slot on resource
manager (#7049)
5bf6f7892a is described below
commit 5bf6f7892a65ed26ef0bee3d6e21638bd7cab37b
Author: Jia Fan <[email protected]>
AuthorDate: Wed Jul 10 22:28:42 2024 +0800
[Improve][Zeta] Add retry when request slot on resource manager (#7049)
---
.../resourcemanager/AbstractResourceManager.java | 4 +-
.../resourcemanager/ResourceRequestHandler.java | 118 +++++++++++++-----
.../opeartion/ReleaseSlotOperation.java | 8 ++
.../resourcemanager/worker/WorkerProfile.java | 4 +
.../server/service/slot/DefaultSlotService.java | 6 +
.../server/service/slot/SlotAndWorkerProfile.java | 1 +
.../resourcemanager/FakeResourceManager.java | 49 +++-----
...FakeResourceManagerForRequestSlotRetryTest.java | 133 +++++++++++++++++++++
.../resourcemanager/ResourceManagerTest.java | 115 ++++++++++++++++--
9 files changed, 363 insertions(+), 75 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index de93bf72ed..5fe29fa6f1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -223,7 +223,9 @@ public abstract class AbstractResourceManager implements
ResourceManager {
@Override
public CompletableFuture<Void> releaseResource(long jobId, SlotProfile
profile) {
if (nodeEngine.getClusterService().getMember(profile.getWorker()) !=
null) {
- return sendToMember(new ReleaseSlotOperation(jobId, profile),
profile.getWorker());
+ CompletableFuture<WorkerProfile> future =
+ sendToMember(new ReleaseSlotOperation(jobId, profile),
profile.getWorker());
+ return future.thenAccept(this::heartbeat);
} else {
return CompletableFuture.completedFuture(null);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index 0af3738a4a..0009357fcc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.cluster.Address;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
@@ -33,7 +34,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -54,6 +54,8 @@ public class ResourceRequestHandler {
private final ConcurrentMap<Integer, SlotProfile> resultSlotProfiles;
private final ConcurrentMap<Address, WorkerProfile> registerWorker;
+ private static final int MAX_RETRY_TIMES = 3;
+
private final long jobId;
private final List<ResourceProfile> resourceProfile;
@@ -74,49 +76,94 @@ public class ResourceRequestHandler {
}
public CompletableFuture<List<SlotProfile>> request(Map<String, String>
tags) {
- List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new
ArrayList<>();
- for (int i = 0; i < resourceProfile.size(); i++) {
- ResourceProfile r = resourceProfile.get(i);
- Optional<WorkerProfile> workerProfile = preCheckWorkerResource(r);
- if (workerProfile.isPresent()) {
- // request slot to member
- CompletableFuture<SlotAndWorkerProfile>
internalCompletableFuture =
- singleResourceRequestToMember(i, r,
workerProfile.get());
- allRequestFuture.add(internalCompletableFuture);
- }
+ requestSlotWithRetry(resourceProfile, MAX_RETRY_TIMES, tags);
+ return completableFuture;
+ }
+
+ private CompletableFuture<SlotAndWorkerProfile> requestSlotWithRetry(
+ List<ResourceProfile> request, int retryTimes, Map<String, String>
tags) {
+ if (retryTimes <= 0) {
+ LOGGER.fine("can't apply resource request with retry times: " +
MAX_RETRY_TIMES);
+ return CompletableFuture.supplyAsync(
+ () -> {
+ throw new NoEnoughResourceException(
+ "can't apply resource request with retry
times: "
+ + MAX_RETRY_TIMES);
+ });
}
+ List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture =
requestSlots(request);
// all resource preCheck done, also had sent request to worker
- getAllOfFuture(allRequestFuture)
+ return getAllOfFuture(allRequestFuture)
.whenComplete(
withTryCatch(
LOGGER,
(unused, error) -> {
if (error != null) {
completeRequestWithException(error);
- }
- if (resultSlotProfiles.size() <
resourceProfile.size()) {
- // meaning have some slot not request
success
- if
(resourceManager.supportDynamicWorker()) {
- applyByDynamicWorker(tags);
- } else {
- completeRequestWithException(
- new
NoEnoughResourceException(
- "can't apply
resource request: "
- +
resourceProfile.get(
-
findNullIndexInResultSlotProfiles())));
+ } else {
+ List<ResourceProfile>
needRequestResource =
+ stillNeedRequestResource();
+ if (!needRequestResource.isEmpty()) {
+ Exception
requestSlotWithRetryError = null;
+ try {
+ requestSlotWithRetry(
+
needRequestResource,
+ retryTimes - 1,
+ tags)
+ .get();
+ } catch (Exception e) {
+ LOGGER.warning(
+ "request slot with
retry error: "
+ +
e.getMessage());
+ requestSlotWithRetryError = e;
+ }
+ if (requestSlotWithRetryError !=
null) {
+ // meaning have some slot not
request success
+ if
(resourceManager.supportDynamicWorker()) {
+ applyByDynamicWorker(tags);
+ } else {
+
completeRequestWithException(
+
requestSlotWithRetryError);
+ }
+ }
}
}
}));
- return completableFuture;
}
- private int findNullIndexInResultSlotProfiles() {
+ private List<ResourceProfile> stillNeedRequestResource() {
+ List<ResourceProfile> needRequestResource = new ArrayList<>();
for (int i = 0; i < resourceProfile.size(); i++) {
if (!resultSlotProfiles.containsKey(i)) {
- return i;
+ needRequestResource.add(resourceProfile.get(i));
+ }
+ }
+ return needRequestResource;
+ }
+
+ private List<CompletableFuture<SlotAndWorkerProfile>> requestSlots(
+ List<ResourceProfile> requestProfile) {
+ List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new
ArrayList<>();
+ for (int i = 0; i < requestProfile.size(); i++) {
+ ResourceProfile r = requestProfile.get(i);
+ Optional<WorkerProfile> workerProfile = preCheckWorkerResource(r);
+ if (workerProfile.isPresent()) {
+ // request slot to member
+ CompletableFuture<SlotAndWorkerProfile>
internalCompletableFuture =
+ singleResourceRequestToMember(i, r,
workerProfile.get());
+ allRequestFuture.add(internalCompletableFuture);
+ } else {
+ // if no worker can provide the resource, we should return a
failed future
+ LOGGER.fine("pre check worker resource failed, can't apply
resource request: " + r);
+ allRequestFuture.add(
+ CompletableFuture.supplyAsync(
+ () -> {
+ throw new NoEnoughResourceException(
+ "can't apply resource request: " +
r);
+ }));
}
}
- return -1;
+ return allRequestFuture;
}
private void completeRequestWithException(Throwable e) {
@@ -125,6 +172,7 @@ public class ResourceRequestHandler {
}
private void addSlotToCacheMap(int index, SlotProfile slotProfile) {
+ // null value means the slot request failed, no suitable slot found
if (null != slotProfile) {
resultSlotProfiles.put(index, slotProfile);
if (resultSlotProfiles.size() == resourceProfile.size()) {
@@ -134,6 +182,8 @@ public class ResourceRequestHandler {
}
completableFuture.complete(value);
}
+ } else {
+ LOGGER.fine("no suitable slot found for resource: " +
resourceProfile.get(index));
}
}
@@ -155,7 +205,8 @@ public class ResourceRequestHandler {
}));
}
- private Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
+ @VisibleForTesting
+ public Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
// Shuffle the order to ensure random selection of workers
List<WorkerProfile> workerProfiles =
Arrays.asList(registerWorker.values().toArray(new
WorkerProfile[0]));
@@ -176,6 +227,7 @@ public class ResourceRequestHandler {
// Check if there are still unassigned resources
workerProfile =
workerProfiles.stream()
+ .filter(WorkerProfile::isDynamicSlot)
.filter(worker ->
worker.getUnassignedResource().enoughThan(r))
.findAny();
}
@@ -217,11 +269,13 @@ public class ResourceRequestHandler {
private void releaseAllResourceInternal() {
LOGGER.warning("apply resource not success, release all already
applied resource");
- resultSlotProfiles.values().stream()
- .filter(Objects::nonNull)
+ new ArrayList<>(resultSlotProfiles.keySet())
.forEach(
- profile -> {
- resourceManager.releaseResource(jobId, profile);
+ index -> {
+ SlotProfile profile =
resultSlotProfiles.remove(index);
+ if (profile != null) {
+ resourceManager.releaseResource(jobId,
profile);
+ }
});
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
index e812815746..2d2a0d0416 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.engine.server.resourcemanager.opeartion;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
import
org.apache.seatunnel.engine.server.service.slot.WrongTargetSlotException;
@@ -36,6 +37,7 @@ public class ReleaseSlotOperation extends Operation
implements IdentifiedDataSer
private long jobID;
private SlotProfile slotProfile;
+ private WorkerProfile result;
public ReleaseSlotOperation() {}
@@ -56,6 +58,12 @@ public class ReleaseSlotOperation extends Operation
implements IdentifiedDataSer
slotProfile,
ExceptionUtils.getMessage(ignore));
}
+ result = server.getSlotService().getWorkerProfile();
+ }
+
+ @Override
+ public Object getResponse() {
+ return result;
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index 291df1f1f8..0d0f8c8054 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -44,6 +44,8 @@ public class WorkerProfile implements
IdentifiedDataSerializable {
private ResourceProfile unassignedResource;
+ private boolean dynamicSlot;
+
private SlotProfile[] assignedSlots;
private SlotProfile[] unassignedSlots;
@@ -82,6 +84,7 @@ public class WorkerProfile implements
IdentifiedDataSerializable {
for (SlotProfile unassignedSlot : unassignedSlots) {
out.writeObject(unassignedSlot);
}
+ out.writeBoolean(dynamicSlot);
}
@Override
@@ -99,5 +102,6 @@ public class WorkerProfile implements
IdentifiedDataSerializable {
for (int i = 0; i < unassignedSlots.length; i++) {
unassignedSlots[i] = in.readObject();
}
+ dynamicSlot = in.readBoolean();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index 250a6f2eb4..7c0ae38bfd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -196,6 +196,11 @@ public class DefaultSlotService implements SlotService {
}
}
+ /**
+ * Select the best match slot for the profile.
+ *
+ * @return the best match slot, null if no suitable slot found.
+ */
private SlotProfile selectBestMatchSlot(ResourceProfile profile) {
if (unassignedSlots.isEmpty() && !config.isDynamicSlot()) {
return null;
@@ -259,6 +264,7 @@ public class DefaultSlotService implements SlotService {
workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new
SlotProfile[0]));
workerProfile.setUnassignedResource(unassignedResource.get());
workerProfile.setAttributes(nodeEngine.getLocalMember().getAttributes());
+ workerProfile.setDynamicSlot(config.isDynamicSlot());
return workerProfile;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
index dc9a46d63c..a9168518e7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
@@ -31,6 +31,7 @@ public class SlotAndWorkerProfile implements
IdentifiedDataSerializable {
private WorkerProfile workerProfile;
+ // null value means the slot request failed, no suitable slot found
private SlotProfile slotProfile;
public SlotAndWorkerProfile() {}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
index 8d45ef2d49..0118c15879 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
@@ -42,42 +42,28 @@ public class FakeResourceManager extends
AbstractResourceManager {
@Override
public void init() {
try {
- Address address1 = new Address("localhost", 5801);
- WorkerProfile workerProfile1 =
- new WorkerProfile(
- address1,
- new ResourceProfile(),
- new ResourceProfile(),
- new SlotProfile[] {},
- new SlotProfile[] {},
- Collections.emptyMap());
- this.registerWorker.put(address1, workerProfile1);
-
- Address address2 = new Address("localhost", 5802);
- WorkerProfile workerProfile2 =
- new WorkerProfile(
- address2,
- new ResourceProfile(),
- new ResourceProfile(),
- new SlotProfile[] {},
- new SlotProfile[] {},
- Collections.emptyMap());
- this.registerWorker.put(address2, workerProfile2);
- Address address3 = new Address("localhost", 5803);
- WorkerProfile workerProfile3 =
- new WorkerProfile(
- address3,
- new ResourceProfile(),
- new ResourceProfile(),
- new SlotProfile[] {},
- new SlotProfile[] {},
- Collections.emptyMap());
- this.registerWorker.put(address3, workerProfile3);
+ generateWorker(5801);
+ generateWorker(5802);
+ generateWorker(5803);
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
+ private void generateWorker(int port) throws UnknownHostException {
+ Address address = new Address("localhost", port);
+ WorkerProfile workerProfile =
+ new WorkerProfile(
+ address,
+ new ResourceProfile(),
+ new ResourceProfile(),
+ true,
+ new SlotProfile[] {},
+ new SlotProfile[] {},
+ Collections.emptyMap());
+ this.registerWorker.put(address, workerProfile);
+ }
+
@Override
protected <E> CompletableFuture<E> sendToMember(Operation operation,
Address address) {
if (operation instanceof RequestSlotOperation) {
@@ -88,6 +74,7 @@ public class FakeResourceManager extends
AbstractResourceManager {
address,
new ResourceProfile(),
new ResourceProfile(),
+ true,
new SlotProfile[] {},
new SlotProfile[] {},
Collections.emptyMap()),
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
new file mode 100644
index 0000000000..1dc427e4f8
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.config.EngineConfig;
+import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Used to test ResourceManager, override init method to register more
workers. */
+public class FakeResourceManagerForRequestSlotRetryTest extends
AbstractResourceManager {
+
+ private final int newWorkerCount;
+ private final int noSlotWorkerCount;
+ private final AtomicInteger queryIndex = new AtomicInteger(0);
+
+ private final Set<Address> cannotRequestAddress = new HashSet<>();
+
+ public FakeResourceManagerForRequestSlotRetryTest(
+ NodeEngine nodeEngine, int newWorkerCount, int noSlotWorkerCount) {
+ super(nodeEngine, new EngineConfig());
+ this.newWorkerCount = newWorkerCount;
+ this.noSlotWorkerCount = noSlotWorkerCount;
+ init();
+ }
+
+ @Override
+ public void init() {
+ try {
+ for (int i = 0; i < newWorkerCount; i++) {
+ generateWorker(5801 + i);
+ }
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void generateWorker(int port) throws UnknownHostException {
+ Address address = new Address("localhost", port);
+ WorkerProfile workerProfile =
+ new WorkerProfile(
+ address,
+ new ResourceProfile(),
+ new ResourceProfile(),
+ false,
+ new SlotProfile[] {},
+ new SlotProfile[] {
+ new SlotProfile(address, 1, new ResourceProfile(),
""),
+ new SlotProfile(address, 2, new ResourceProfile(),
"")
+ },
+ Collections.emptyMap());
+ this.registerWorker.put(address, workerProfile);
+ }
+
+ @Override
+ protected <E> CompletableFuture<E> sendToMember(Operation operation,
Address address) {
+ if (operation instanceof RequestSlotOperation) {
+ if (cannotRequestAddress.contains(address)) {
+ throw new IllegalStateException("Cannot request slot for " +
address);
+ }
+ if (queryIndex.getAndIncrement() < noSlotWorkerCount) {
+ cannotRequestAddress.add(address);
+ // query will return empty slot
+ return (CompletableFuture<E>)
+ CompletableFuture.completedFuture(
+ new SlotAndWorkerProfile(
+ new WorkerProfile(
+ address,
+ new ResourceProfile(),
+ new ResourceProfile(),
+ false,
+ new SlotProfile[] {
+ new SlotProfile(
+ address, 1, new
ResourceProfile(), ""),
+ new SlotProfile(
+ address, 2, new
ResourceProfile(), "")
+ },
+ // no unassigned slot
+ new SlotProfile[] {},
+ Collections.emptyMap()),
+ null));
+ }
+ return (CompletableFuture<E>)
+ CompletableFuture.completedFuture(
+ new SlotAndWorkerProfile(
+ new WorkerProfile(
+ address,
+ new ResourceProfile(),
+ new ResourceProfile(),
+ false,
+ new SlotProfile[] {
+ new SlotProfile(
+ address, 1, new
ResourceProfile(), "")
+ },
+ new SlotProfile[] {
+ new SlotProfile(
+ address, 3, new
ResourceProfile(), "")
+ },
+ Collections.emptyMap()),
+ new SlotProfile(address, 2, new
ResourceProfile(), "")));
+ } else {
+ return super.sendToMember(operation, address);
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
index 2589e6530c..abd4ccdc09 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -29,9 +30,14 @@ import org.junit.jupiter.api.Test;
import com.hazelcast.cluster.Address;
+import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -87,23 +93,110 @@ public class ResourceManagerTest extends
AbstractSeaTunnelServerTest<ResourceMan
public void testApplyResourceWithRandomResult()
throws ExecutionException, InterruptedException {
FakeResourceManager resourceManager = new
FakeResourceManager(nodeEngine);
+ boolean hasDifferentWorker = false;
+ for (int i = 0; i < 5; i++) {
+ List<ResourceProfile> resourceProfiles = new ArrayList<>();
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ resourceProfiles.add(new ResourceProfile());
+ List<SlotProfile> slotProfiles =
+ resourceManager.applyResources(1L, resourceProfiles,
null).get();
+ Assertions.assertEquals(slotProfiles.size(), 5);
+ Set<Address> addresses =
+
slotProfiles.stream().map(SlotProfile::getWorker).collect(Collectors.toSet());
+ hasDifferentWorker |= addresses.size() > 1;
+ }
+ Assertions.assertTrue(hasDifferentWorker, "should have different
worker for each slot");
+ }
+ @Test
+ public void testApplyResourceWithRetryWhenSameNodeNoSlotSuited()
+ throws ExecutionException, InterruptedException {
+ // test retry request slot times 1
+ FakeResourceManagerForRequestSlotRetryTest resourceManager =
+ new FakeResourceManagerForRequestSlotRetryTest(nodeEngine, 2,
1);
List<ResourceProfile> resourceProfiles = new ArrayList<>();
resourceProfiles.add(new ResourceProfile());
resourceProfiles.add(new ResourceProfile());
- resourceProfiles.add(new ResourceProfile());
- resourceProfiles.add(new ResourceProfile());
- resourceProfiles.add(new ResourceProfile());
List<SlotProfile> slotProfiles =
resourceManager.applyResources(1L, resourceProfiles,
null).get();
- Assertions.assertEquals(slotProfiles.size(), 5);
+ Assertions.assertEquals(slotProfiles.size(), 2);
- boolean hasDifferentWorker = false;
- for (int i = 0; i < 5; i++) {
- Set<Address> addresses =
-
slotProfiles.stream().map(SlotProfile::getWorker).collect(Collectors.toSet());
- hasDifferentWorker = addresses.size() > 1;
- }
- Assertions.assertTrue(hasDifferentWorker, "should have different
worker for each slot");
+ // test retry request slot time 2 but no enough slot with worker
+ resourceManager = new
FakeResourceManagerForRequestSlotRetryTest(nodeEngine, 2, 2);
+ FakeResourceManagerForRequestSlotRetryTest finalResourceManager =
resourceManager;
+ List<ResourceProfile> finalResourceProfiles = resourceProfiles;
+ ExecutionException exception =
+ Assertions.assertThrows(
+ ExecutionException.class,
+ () ->
+ finalResourceManager
+ .applyResources(1L,
finalResourceProfiles, null)
+ .get());
+ Assertions.assertInstanceOf(NoEnoughResourceException.class,
exception.getCause());
+
+ // test retry request slot time 4 so that more than max retry times
+ resourceProfiles = new ArrayList<>();
+ resourceProfiles.add(new ResourceProfile());
+ resourceManager = new
FakeResourceManagerForRequestSlotRetryTest(nodeEngine, 5, 4);
+ List<ResourceProfile> finalResourceProfiles2 = resourceProfiles;
+ FakeResourceManagerForRequestSlotRetryTest finalResourceManager2 =
resourceManager;
+ ExecutionException exception2 =
+ Assertions.assertThrows(
+ ExecutionException.class,
+ () ->
+ finalResourceManager2
+ .applyResources(1L,
finalResourceProfiles2, null)
+ .get());
+ Assertions.assertInstanceOf(
+ NoEnoughResourceException.class,
exception2.getCause().getCause());
+ Assertions.assertEquals(
+ "can't apply resource request with retry times: 3",
+ exception2.getCause().getCause().getMessage());
+ }
+
+ @Test
+ public void testPreCheckWorkerResourceWithDynamicSlot() throws
UnknownHostException {
+ testPreCheckWorkerResource(true);
+ testPreCheckWorkerResource(false);
+ }
+
+ public void testPreCheckWorkerResource(boolean dynamicSlot) throws
UnknownHostException {
+ List<ResourceProfile> resourceProfiles = new ArrayList<>();
+ resourceProfiles.add(new ResourceProfile());
+ ConcurrentMap<Address, WorkerProfile> registerWorker = new
ConcurrentHashMap<>();
+ Address address1 = new Address("localhost", 5801);
+ WorkerProfile workerProfile1 =
+ new WorkerProfile(
+ address1,
+ new ResourceProfile(),
+ new ResourceProfile(),
+ dynamicSlot,
+ new SlotProfile[] {},
+ new SlotProfile[] {},
+ Collections.emptyMap());
+ registerWorker.put(address1, workerProfile1);
+
+ Address address2 = new Address("localhost", 5802);
+ WorkerProfile workerProfile2 =
+ new WorkerProfile(
+ address2,
+ new ResourceProfile(),
+ new ResourceProfile(),
+ dynamicSlot,
+ new SlotProfile[] {},
+ new SlotProfile[] {},
+ Collections.emptyMap());
+ registerWorker.put(address2, workerProfile2);
+ Optional<WorkerProfile> result =
+ new ResourceRequestHandler(
+ jobId,
+ resourceProfiles,
+ registerWorker,
+ (AbstractResourceManager) this.resourceManager)
+ .preCheckWorkerResource(new ResourceProfile());
+ Assertions.assertEquals(result.isPresent(), dynamicSlot);
}
}