This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new d2d14381d [CELEBORN-695] Fix UnsupportedOperationException by
refactoring WorkerInfo
d2d14381d is described below
commit d2d14381db1047ed2c0f965ad6929c2dfb921768
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Jun 19 19:38:55 2023 +0800
[CELEBORN-695] Fix UnsupportedOperationException by refactoring WorkerInfo
### What changes were proposed in this pull request?
Refactor WorkerInfo
1. make ```diskInfos```, ```userResourceConsumption``` new maps instead of
using the passed in reference
2. remove ```endpoint``` from the constructor
### Why are the changes needed?
When manually test stop-worker.sh with graceful turned on, I got the
following Exception
```
23/06/19 11:04:25,665 INFO [worker-forward-message-scheduler]
RssHARetryClient: connect to master master-1-1:9097.
23/06/19 11:04:27,168 ERROR [worker-forward-message-scheduler]
RssHARetryClient: Send rpc with failure, has tried 15, max try 15!
org.apache.celeborn.common.exception.CelebornException: Exception thrown in
awaitResult:
at
org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:231)
at
org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
at
org.apache.celeborn.common.haclient.RssHARetryClient.sendMessageInner(RssHARetryClient.java:150)
at
org.apache.celeborn.common.haclient.RssHARetryClient.askSync(RssHARetryClient.java:118)
at
org.apache.celeborn.service.deploy.worker.Worker.org$apache$celeborn$service$deploy$worker$Worker$$heartBeatToMaster(Worker.scala:306)
at
org.apache.celeborn.service.deploy.worker.Worker$$anon$1.$anonfun$run$1(Worker.scala:332)
at
org.apache.celeborn.common.util.Utils$.tryLogNonFatalError(Utils.scala:186)
at
org.apache.celeborn.service.deploy.worker.Worker$$anon$1.run(Worker.scala:332)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.celeborn.common.exception.CelebornIOException: remove
at
org.apache.celeborn.service.deploy.master.clustermeta.ha.HAHelper.sendFailure(HAHelper.java:65)
at
org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:210)
at
org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:315)
at
org.apache.celeborn.common.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at
org.apache.celeborn.common.rpc.netty.Inbox.safelyCall(Inbox.scala:222)
at
org.apache.celeborn.common.rpc.netty.Inbox.process(Inbox.scala:110)
at
org.apache.celeborn.common.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:229)
... 3 more
Caused by: java.lang.UnsupportedOperationException: remove
at
scala.collection.convert.Wrappers$MapWrapper$$anon$2$$anon$3.remove(Wrappers.scala:236)
at java.util.AbstractMap.remove(AbstractMap.java:254)
at
org.apache.celeborn.common.meta.WorkerInfo.$anonfun$updateThenGetDiskInfos$2(WorkerInfo.scala:225)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.celeborn.common.meta.WorkerInfo.updateThenGetDiskInfos(WorkerInfo.scala:224)
at
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager.lambda$updateWorkerHeartbeatMeta$5(AbstractMetaManager.java:205)
at java.util.Optional.ifPresent(Optional.java:159)
at
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager.updateWorkerHeartbeatMeta(AbstractMetaManager.java:203)
at
org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager.handleWorkerHeartbeat(SingleMasterMetaManager.java:105)
at
org.apache.celeborn.service.deploy.master.Master.org$apache$celeborn$service$deploy$master$Master$$handleHeartbeatFromWorker(Master.scala:428)
at
org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$20(Master.scala:326)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:207)
... 8 more
```
According to the suggestion from
https://github.com/apache/incubator-celeborn/pull/1602#issuecomment-1596722991
### Does this PR introduce _any_ user-facing change?
Yes, it fixes bug described in
https://github.com/apache/incubator-celeborn/pull/1602
### How was this patch tested?
UTs and manual test.
Closes #1605 from waitinfuture/695.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 7d634db54712a0924862581a8c9630f3b22cd1cd)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/celeborn/common/meta/WorkerInfo.scala | 34 +++++--------
.../apache/celeborn/common/util/PbSerDeUtils.scala | 3 +-
.../celeborn/common/meta/WorkerInfoSuite.scala | 56 ++++++++--------------
.../celeborn/common/util/PbSerDeUtilsTest.scala | 4 +-
.../master/clustermeta/AbstractMetaManager.java | 22 ++-------
.../celeborn/service/deploy/master/Master.scala | 6 +--
.../master/SlotsAllocatorRackAwareSuiteJ.java | 12 ++---
.../deploy/master/SlotsAllocatorSuiteJ.java | 6 +--
.../clustermeta/DefaultMetaSystemSuiteJ.java | 27 ++++-------
.../clustermeta/ha/MasterStateMachineSuiteJ.java | 6 +--
.../ha/RatisMasterStatusSystemSuiteJ.java | 27 ++++-------
.../celeborn/service/deploy/worker/Worker.scala | 3 +-
12 files changed, 71 insertions(+), 135 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 02f036bcb..b1bf51cd8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -34,12 +34,19 @@ class WorkerInfo(
val pushPort: Int,
val fetchPort: Int,
val replicatePort: Int,
- val diskInfos: util.Map[String, DiskInfo],
- val userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
- var endpoint: RpcEndpointRef) extends Serializable with Logging {
+ _diskInfos: util.Map[String, DiskInfo],
+ _userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption])
extends Serializable
+ with Logging {
var unknownDiskSlots = new java.util.HashMap[String, Integer]()
var networkLocation = "/default-rack"
var lastHeartbeat: Long = 0
+ val diskInfos =
+ if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String,
DiskInfo](_diskInfos) else null
+ val userResourceConsumption =
+ if (_userResourceConsumption != null)
+ JavaUtils.newConcurrentHashMap[UserIdentifier,
ResourceConsumption](_userResourceConsumption)
+ else null
+ var endpoint: RpcEndpointRef = null
def this(host: String, rpcPort: Int, pushPort: Int, fetchPort: Int,
replicatePort: Int) {
this(
@@ -49,26 +56,7 @@ class WorkerInfo(
fetchPort,
replicatePort,
new util.HashMap[String, DiskInfo](),
- JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](),
- null)
- }
-
- def this(
- host: String,
- rpcPort: Int,
- pushPort: Int,
- fetchPort: Int,
- replicatePort: Int,
- endpoint: RpcEndpointRef) {
- this(
- host,
- rpcPort,
- pushPort,
- fetchPort,
- replicatePort,
- new util.HashMap[String, DiskInfo](),
- JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](),
- endpoint)
+ new util.HashMap[UserIdentifier, ResourceConsumption]())
}
val allocationBuckets = new Array[Int](61)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index e6e7a1e6f..be4aab245 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -191,8 +191,7 @@ object PbSerDeUtils {
pbWorkerInfo.getFetchPort,
pbWorkerInfo.getReplicatePort,
disks,
- userResourceConsumption,
- null)
+ userResourceConsumption)
}
def toPbWorkerInfo(
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index 8ccf1d295..221dffa47 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -56,7 +56,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
replicatePort: Int,
workerInfos: jMap[WorkerInfo, util.Map[String, Integer]],
allocationMap: util.Map[String, Integer]): Unit = {
- val worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort, null)
+ val worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort)
val realWorker = workerInfos.get(worker)
assertNotNull(s"Worker $worker didn't exist.", realWorker)
}
@@ -71,7 +71,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]()
userResourceConsumption.put(UserIdentifier("tenant1", "name1"),
ResourceConsumption(1, 1, 1, 1))
val worker =
- new WorkerInfo("localhost", 10000, 10001, 10002, 10003, disks,
userResourceConsumption, null)
+ new WorkerInfo("localhost", 10000, 10001, 10002, 10003, disks,
userResourceConsumption)
val allocatedSlots = new AtomicInteger(0)
val shuffleKey = "appId-shuffleId"
@@ -137,32 +137,32 @@ class WorkerInfoSuite extends CelebornFunSuite {
}
test("WorkerInfo not equals when host different.") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null,
null)
- val worker2 = new WorkerInfo("h2", 10001, 10002, 10003, 1000, null, null,
null)
+ val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+ val worker2 = new WorkerInfo("h2", 10001, 10002, 10003, 1000, null, null)
assertNotEquals(worker1, worker2)
}
test("WorkerInfo not equals when rpc port different.") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null,
null)
- val worker2 = new WorkerInfo("h1", 20001, 10002, 10003, 1000, null, null,
null)
+ val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+ val worker2 = new WorkerInfo("h1", 20001, 10002, 10003, 1000, null, null)
assertNotEquals(worker1, worker2)
}
test("WorkerInfo not equals when push port different.") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null,
null)
- val worker2 = new WorkerInfo("h1", 10001, 20002, 10003, 1000, null, null,
null)
+ val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+ val worker2 = new WorkerInfo("h1", 10001, 20002, 10003, 1000, null, null)
assertNotEquals(worker1, worker2)
}
test("WorkerInfo not equals when fetch port different.") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null,
null)
- val worker2 = new WorkerInfo("h1", 10001, 10002, 20003, 1000, null, null,
null)
+ val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+ val worker2 = new WorkerInfo("h1", 10001, 10002, 20003, 1000, null, null)
assertNotEquals(worker1, worker2)
}
test("WorkerInfo not equals when replicate port different.") {
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null,
null)
- val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 2000, null, null,
null)
+ val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+ val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 2000, null, null)
assertNotEquals(worker1, worker2)
}
@@ -174,9 +174,8 @@ class WorkerInfoSuite extends CelebornFunSuite {
10003,
1000,
new util.HashMap[String, DiskInfo](),
- null,
null)
- val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null,
null)
+ val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
assertEquals(worker1, worker2)
}
@@ -188,31 +187,20 @@ class WorkerInfoSuite extends CelebornFunSuite {
10003,
1000,
null,
- new util.HashMap[UserIdentifier, ResourceConsumption](),
- null)
- val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null,
null)
+ new util.HashMap[UserIdentifier, ResourceConsumption]())
+ val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
assertEquals(worker1, worker2)
}
test("WorkerInfo equals when endpoint different") {
- val mockEndpoint = new RpcEndpointRef(new CelebornConf()) {
-
- override def address: RpcAddress = ???
-
- override def name: String = ???
-
- override def send(message: Any): Unit = ???
-
- override def ask[T: ClassTag](message: Any, timeout: RpcTimeout):
concurrent.Future[T] = ???
- }
- val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null,
mockEndpoint)
- val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null,
null)
+ val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+ val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
assertEquals(worker1, worker2)
}
test("WorkerInfo toString output") {
val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000)
- val worker2 = new WorkerInfo("h2", 20001, 20002, 20003, 2000, null, null,
null)
+ val worker2 = new WorkerInfo("h2", 20001, 20002, 20003, 2000, null, null)
val worker3 = new WorkerInfo(
"h3",
@@ -221,7 +209,6 @@ class WorkerInfoSuite extends CelebornFunSuite {
30003,
3000,
new util.HashMap[String, DiskInfo](),
- null,
null)
val disks = new util.HashMap[String, DiskInfo]()
@@ -236,8 +223,6 @@ class WorkerInfoSuite extends CelebornFunSuite {
val conf = new CelebornConf()
val endpointAddress = new RpcEndpointAddress(new RpcAddress("localhost",
12345), "mockRpc")
val rpcEnv = RpcEnv.create("mockEnv", "localhost", "localhost", 12345,
conf, 64)
- val rpcEndpointRef =
- new NettyRpcEndpointRef(conf, endpointAddress,
rpcEnv.asInstanceOf[NettyRpcEnv])
val worker4 = new WorkerInfo(
"h4",
40001,
@@ -245,8 +230,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
40003,
4000,
disks,
- userResourceConsumption,
- rpcEndpointRef)
+ userResourceConsumption)
val placeholder = ""
val exp1 =
@@ -304,7 +288,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
| DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2147483647,
avgFlushTime: 2, avgFetchTime: 2, activeSlots: 20) status: HEALTHY dirs
$placeholder
|UserResourceConsumption: $placeholder
| UserIdentifier: `tenant1`.`name1`, ResourceConsumption:
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1,
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
- |WorkerRef: NettyRpcEndpointRef(rss://mockRpc@localhost:12345)
+ |WorkerRef: null
|""".stripMargin;
println(worker1)
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index fa60db3f7..8ac9c7138 100644
---
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -68,9 +68,9 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
userResourceConsumption.put(userIdentifier2, resourceConsumption2)
val workerInfo1 =
- new WorkerInfo("localhost", 1001, 1002, 1003, 1004, diskInfos,
userResourceConsumption, null)
+ new WorkerInfo("localhost", 1001, 1002, 1003, 1004, diskInfos,
userResourceConsumption)
val workerInfo2 =
- new WorkerInfo("localhost", 2001, 2002, 2003, 2004, diskInfos,
userResourceConsumption, null)
+ new WorkerInfo("localhost", 2001, 2002, 2003, 2004, diskInfos,
userResourceConsumption)
val partitionLocation1 =
new PartitionLocation(0, 0, "host1", 10, 9, 8, 14,
PartitionLocation.Mode.SLAVE)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 20ccd764b..5275ee720 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -152,7 +152,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public void updateWorkerLostMeta(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort) {
- WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort, null);
+ WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort);
workerLostEvents.add(worker);
// remove worker from workers
synchronized (workers) {
@@ -166,7 +166,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public void updateWorkerRemoveMeta(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort) {
- WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort, null);
+ WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort);
// remove worker from workers
synchronized (workers) {
workers.remove(worker);
@@ -188,14 +188,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
long time) {
WorkerInfo worker =
new WorkerInfo(
- host,
- rpcPort,
- pushPort,
- fetchPort,
- replicatePort,
- disks,
- userResourceConsumption,
- null);
+ host, rpcPort, pushPort, fetchPort, replicatePort, disks,
userResourceConsumption);
AtomicLong availableSlots = new AtomicLong();
LOG.debug("update worker {}:{} heart beat {}", host, rpcPort, disks);
synchronized (workers) {
@@ -228,14 +221,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
Map<UserIdentifier, ResourceConsumption> userResourceConsumption) {
WorkerInfo workerInfo =
new WorkerInfo(
- host,
- rpcPort,
- pushPort,
- fetchPort,
- replicatePort,
- disks,
- userResourceConsumption,
- null);
+ host, rpcPort, pushPort, fetchPort, replicatePort, disks,
userResourceConsumption);
workerInfo.lastHeartbeat_$eq(System.currentTimeMillis());
workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation());
workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 563580101..7d7811a1a 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -454,8 +454,7 @@ private[celeborn] class Master(
fetchPort,
replicatePort,
new util.HashMap[String, DiskInfo](),
- JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](),
- null)
+ JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]())
val worker: WorkerInfo = workersSnapShot
.asScala
.find(_ == targetWorker)
@@ -489,8 +488,7 @@ private[celeborn] class Master(
fetchPort,
replicatePort,
disks,
- userResourceConsumption,
- null)
+ userResourceConsumption)
if (workersSnapShot.contains(workerToRegister)) {
logWarning(s"Receive RegisterWorker while worker" +
s" ${workerToRegister.toString()} already exists, re-register.")
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
index 4ed0e6f48..bc7b54b33 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
@@ -120,12 +120,12 @@ public class SlotsAllocatorRackAwareSuiteJ {
private List<WorkerInfo> prepareWorkers(CelebornRackResolver resolver) {
ArrayList<WorkerInfo> workers = new ArrayList<>(3);
- workers.add(new WorkerInfo("host1", 9, 10, 110, 113, new HashMap<>(),
null, null));
- workers.add(new WorkerInfo("host2", 9, 11, 111, 114, new HashMap<>(),
null, null));
- workers.add(new WorkerInfo("host3", 9, 12, 112, 115, new HashMap<>(),
null, null));
- workers.add(new WorkerInfo("host4", 9, 10, 110, 113, new HashMap<>(),
null, null));
- workers.add(new WorkerInfo("host5", 9, 11, 111, 114, new HashMap<>(),
null, null));
- workers.add(new WorkerInfo("host6", 9, 12, 112, 115, new HashMap<>(),
null, null));
+ workers.add(new WorkerInfo("host1", 9, 10, 110, 113, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host2", 9, 11, 111, 114, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host3", 9, 12, 112, 115, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host4", 9, 10, 110, 113, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host5", 9, 11, 111, 114, new HashMap<>(),
null));
+ workers.add(new WorkerInfo("host6", 9, 12, 112, 115, new HashMap<>(),
null));
workers.forEach(
new Consumer<WorkerInfo>() {
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 900752672..ef62ac935 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -130,9 +130,9 @@ public class SlotsAllocatorSuiteJ {
disks3.put("/mnt/disk3", diskInfo9);
ArrayList<WorkerInfo> workers = new ArrayList<>(3);
- workers.add(new WorkerInfo("host1", 9, 10, 110, 113, disks1, null, null));
- workers.add(new WorkerInfo("host2", 9, 11, 111, 114, disks2, null, null));
- workers.add(new WorkerInfo("host3", 9, 12, 112, 115, disks3, null, null));
+ workers.add(new WorkerInfo("host1", 9, 10, 110, 113, disks1, null));
+ workers.add(new WorkerInfo("host2", 9, 11, 111, 114, disks2, null));
+ workers.add(new WorkerInfo("host3", 9, 12, 112, 115, disks3, null));
return workers;
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index f58ebba2a..115426c8b 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -220,8 +220,7 @@ public class DefaultMetaSystemSuiteJ {
FETCHPORT1,
REPLICATEPORT1,
disks1,
- userResourceConsumption1,
- dummyRef);
+ userResourceConsumption1);
WorkerInfo workerInfo2 =
new WorkerInfo(
HOSTNAME2,
@@ -230,8 +229,7 @@ public class DefaultMetaSystemSuiteJ {
FETCHPORT2,
REPLICATEPORT2,
disks2,
- userResourceConsumption2,
- dummyRef);
+ userResourceConsumption2);
WorkerInfo workerInfo3 =
new WorkerInfo(
HOSTNAME3,
@@ -240,8 +238,7 @@ public class DefaultMetaSystemSuiteJ {
FETCHPORT3,
REPLICATEPORT3,
disks3,
- userResourceConsumption3,
- dummyRef);
+ userResourceConsumption3);
Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
Map<String, Integer> allocation = new HashMap<>();
@@ -369,8 +366,7 @@ public class DefaultMetaSystemSuiteJ {
FETCHPORT1,
REPLICATEPORT1,
disks1,
- userResourceConsumption1,
- dummyRef);
+ userResourceConsumption1);
WorkerInfo workerInfo2 =
new WorkerInfo(
HOSTNAME2,
@@ -379,8 +375,7 @@ public class DefaultMetaSystemSuiteJ {
FETCHPORT2,
REPLICATEPORT2,
disks2,
- userResourceConsumption2,
- dummyRef);
+ userResourceConsumption2);
Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
Map<String, Integer> allocation = new HashMap<>();
@@ -435,8 +430,7 @@ public class DefaultMetaSystemSuiteJ {
FETCHPORT1,
REPLICATEPORT1,
disks1,
- userResourceConsumption1,
- dummyRef);
+ userResourceConsumption1);
WorkerInfo workerInfo2 =
new WorkerInfo(
HOSTNAME2,
@@ -445,8 +439,7 @@ public class DefaultMetaSystemSuiteJ {
FETCHPORT2,
REPLICATEPORT2,
disks2,
- userResourceConsumption2,
- dummyRef);
+ userResourceConsumption2);
Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
Map<String, Integer> allocation = new HashMap<>();
@@ -587,8 +580,7 @@ public class DefaultMetaSystemSuiteJ {
FETCHPORT1,
REPLICATEPORT1,
disks1,
- userResourceConsumption1,
- dummyRef);
+ userResourceConsumption1);
WorkerInfo workerInfo2 =
new WorkerInfo(
HOSTNAME2,
@@ -597,8 +589,7 @@ public class DefaultMetaSystemSuiteJ {
FETCHPORT2,
REPLICATEPORT2,
disks2,
- userResourceConsumption2,
- dummyRef);
+ userResourceConsumption2);
List<WorkerInfo> failedWorkers = new ArrayList<>();
failedWorkers.add(workerInfo1);
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index df2e9bc26..c092fbf7a 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -246,9 +246,9 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
userResourceConsumption3.put(
new UserIdentifier("tenant3", "name3"), new ResourceConsumption(3000,
3, 3000, 3));
- WorkerInfo info1 = new WorkerInfo("host1", 1, 2, 3, 10, disks1,
userResourceConsumption1, null);
- WorkerInfo info2 = new WorkerInfo("host2", 4, 5, 6, 11, disks2,
userResourceConsumption2, null);
- WorkerInfo info3 = new WorkerInfo("host3", 7, 8, 9, 12, disks3,
userResourceConsumption3, null);
+ WorkerInfo info1 = new WorkerInfo("host1", 1, 2, 3, 10, disks1,
userResourceConsumption1);
+ WorkerInfo info2 = new WorkerInfo("host2", 4, 5, 6, 11, disks2,
userResourceConsumption2);
+ WorkerInfo info3 = new WorkerInfo("host3", 7, 8, 9, 12, disks3,
userResourceConsumption3);
String host1 = "host1";
String host2 = "host2";
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 6fad572af..fdb0ca0e1 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -368,8 +368,7 @@ public class RatisMasterStatusSystemSuiteJ {
FETCHPORT1,
REPLICATEPORT1,
disks1,
- userResourceConsumption1,
- dummyRef);
+ userResourceConsumption1);
WorkerInfo workerInfo2 =
new WorkerInfo(
HOSTNAME2,
@@ -378,8 +377,7 @@ public class RatisMasterStatusSystemSuiteJ {
FETCHPORT2,
REPLICATEPORT2,
disks2,
- userResourceConsumption2,
- dummyRef);
+ userResourceConsumption2);
WorkerInfo workerInfo3 =
new WorkerInfo(
HOSTNAME3,
@@ -388,8 +386,7 @@ public class RatisMasterStatusSystemSuiteJ {
FETCHPORT3,
REPLICATEPORT3,
disks3,
- userResourceConsumption3,
- dummyRef);
+ userResourceConsumption3);
Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
Map<String, Integer> allocation1 = new HashMap<>();
@@ -566,8 +563,7 @@ public class RatisMasterStatusSystemSuiteJ {
FETCHPORT1,
REPLICATEPORT1,
disks1,
- userResourceConsumption1,
- dummyRef);
+ userResourceConsumption1);
WorkerInfo workerInfo2 =
new WorkerInfo(
HOSTNAME2,
@@ -576,8 +572,7 @@ public class RatisMasterStatusSystemSuiteJ {
FETCHPORT2,
REPLICATEPORT2,
disks2,
- userResourceConsumption2,
- dummyRef);
+ userResourceConsumption2);
Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
Map<String, Integer> allocations = new HashMap<>();
allocations.put("disk1", 5);
@@ -640,8 +635,7 @@ public class RatisMasterStatusSystemSuiteJ {
FETCHPORT1,
REPLICATEPORT1,
disks1,
- userResourceConsumption1,
- dummyRef);
+ userResourceConsumption1);
WorkerInfo workerInfo2 =
new WorkerInfo(
HOSTNAME2,
@@ -650,8 +644,7 @@ public class RatisMasterStatusSystemSuiteJ {
FETCHPORT2,
REPLICATEPORT2,
disks2,
- userResourceConsumption2,
- dummyRef);
+ userResourceConsumption2);
Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
Map<String, Integer> allocations = new HashMap<>();
@@ -869,8 +862,7 @@ public class RatisMasterStatusSystemSuiteJ {
FETCHPORT1,
REPLICATEPORT1,
disks1,
- userResourceConsumption1,
- dummyRef);
+ userResourceConsumption1);
WorkerInfo workerInfo2 =
new WorkerInfo(
HOSTNAME2,
@@ -879,8 +871,7 @@ public class RatisMasterStatusSystemSuiteJ {
FETCHPORT2,
REPLICATEPORT2,
disks2,
- userResourceConsumption2,
- dummyRef);
+ userResourceConsumption2);
List<WorkerInfo> failedWorkers = new ArrayList<>();
failedWorkers.add(workerInfo1);
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 361a715ad..6cdbf7913 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -207,8 +207,7 @@ private[celeborn] class Worker(
fetchPort,
replicatePort,
diskInfos,
- userResourceConsumption,
- controller.self)
+ userResourceConsumption)
// whether this Worker registered to Master successfully
val registered = new AtomicBoolean(false)