This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new d2d14381d [CELEBORN-695] Fix UnsupportedOperationException by 
refactoring WorkerInfo
d2d14381d is described below

commit d2d14381db1047ed2c0f965ad6929c2dfb921768
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Jun 19 19:38:55 2023 +0800

    [CELEBORN-695] Fix UnsupportedOperationException by refactoring WorkerInfo
    
    ### What changes were proposed in this pull request?
    Refactor WorkerInfo
    
    1. make ```diskInfos```, ```userResourceConsumption``` new maps instead of 
using the passed in reference
    2. remove ```endpoint``` from the constructor
    
    ### Why are the changes needed?
    
    When manually test stop-worker.sh with graceful turned on, I got the 
following Exception
    ```
    23/06/19 11:04:25,665 INFO [worker-forward-message-scheduler] 
RssHARetryClient: connect to master master-1-1:9097.
    23/06/19 11:04:27,168 ERROR [worker-forward-message-scheduler] 
RssHARetryClient: Send rpc with failure, has tried 15, max try 15!
    org.apache.celeborn.common.exception.CelebornException: Exception thrown in 
awaitResult:
            at 
org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:231)
            at 
org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
            at 
org.apache.celeborn.common.haclient.RssHARetryClient.sendMessageInner(RssHARetryClient.java:150)
            at 
org.apache.celeborn.common.haclient.RssHARetryClient.askSync(RssHARetryClient.java:118)
            at 
org.apache.celeborn.service.deploy.worker.Worker.org$apache$celeborn$service$deploy$worker$Worker$$heartBeatToMaster(Worker.scala:306)
            at 
org.apache.celeborn.service.deploy.worker.Worker$$anon$1.$anonfun$run$1(Worker.scala:332)
            at 
org.apache.celeborn.common.util.Utils$.tryLogNonFatalError(Utils.scala:186)
            at 
org.apache.celeborn.service.deploy.worker.Worker$$anon$1.run(Worker.scala:332)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    Caused by: org.apache.celeborn.common.exception.CelebornIOException: remove
            at 
org.apache.celeborn.service.deploy.master.clustermeta.ha.HAHelper.sendFailure(HAHelper.java:65)
            at 
org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:210)
            at 
org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:315)
            at 
org.apache.celeborn.common.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
            at 
org.apache.celeborn.common.rpc.netty.Inbox.safelyCall(Inbox.scala:222)
            at 
org.apache.celeborn.common.rpc.netty.Inbox.process(Inbox.scala:110)
            at 
org.apache.celeborn.common.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:229)
            ... 3 more
    Caused by: java.lang.UnsupportedOperationException: remove
            at 
scala.collection.convert.Wrappers$MapWrapper$$anon$2$$anon$3.remove(Wrappers.scala:236)
            at java.util.AbstractMap.remove(AbstractMap.java:254)
            at 
org.apache.celeborn.common.meta.WorkerInfo.$anonfun$updateThenGetDiskInfos$2(WorkerInfo.scala:225)
            at scala.collection.Iterator.foreach(Iterator.scala:943)
            at scala.collection.Iterator.foreach$(Iterator.scala:943)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
            at scala.collection.IterableLike.foreach(IterableLike.scala:74)
            at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
            at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
            at 
org.apache.celeborn.common.meta.WorkerInfo.updateThenGetDiskInfos(WorkerInfo.scala:224)
            at 
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager.lambda$updateWorkerHeartbeatMeta$5(AbstractMetaManager.java:205)
            at java.util.Optional.ifPresent(Optional.java:159)
            at 
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager.updateWorkerHeartbeatMeta(AbstractMetaManager.java:203)
            at 
org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager.handleWorkerHeartbeat(SingleMasterMetaManager.java:105)
            at 
org.apache.celeborn.service.deploy.master.Master.org$apache$celeborn$service$deploy$master$Master$$handleHeartbeatFromWorker(Master.scala:428)
            at 
org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$20(Master.scala:326)
            at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
            at 
org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:207)
            ... 8 more
    ```
    
    According to the suggestion from 
https://github.com/apache/incubator-celeborn/pull/1602#issuecomment-1596722991
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it fixes bug described  in 
https://github.com/apache/incubator-celeborn/pull/1602
    
    ### How was this patch tested?
    UTs and manual test.
    
    Closes #1605 from waitinfuture/695.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 7d634db54712a0924862581a8c9630f3b22cd1cd)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/common/meta/WorkerInfo.scala   | 34 +++++--------
 .../apache/celeborn/common/util/PbSerDeUtils.scala |  3 +-
 .../celeborn/common/meta/WorkerInfoSuite.scala     | 56 ++++++++--------------
 .../celeborn/common/util/PbSerDeUtilsTest.scala    |  4 +-
 .../master/clustermeta/AbstractMetaManager.java    | 22 ++-------
 .../celeborn/service/deploy/master/Master.scala    |  6 +--
 .../master/SlotsAllocatorRackAwareSuiteJ.java      | 12 ++---
 .../deploy/master/SlotsAllocatorSuiteJ.java        |  6 +--
 .../clustermeta/DefaultMetaSystemSuiteJ.java       | 27 ++++-------
 .../clustermeta/ha/MasterStateMachineSuiteJ.java   |  6 +--
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 27 ++++-------
 .../celeborn/service/deploy/worker/Worker.scala    |  3 +-
 12 files changed, 71 insertions(+), 135 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 02f036bcb..b1bf51cd8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -34,12 +34,19 @@ class WorkerInfo(
     val pushPort: Int,
     val fetchPort: Int,
     val replicatePort: Int,
-    val diskInfos: util.Map[String, DiskInfo],
-    val userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
-    var endpoint: RpcEndpointRef) extends Serializable with Logging {
+    _diskInfos: util.Map[String, DiskInfo],
+    _userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption]) 
extends Serializable
+  with Logging {
   var unknownDiskSlots = new java.util.HashMap[String, Integer]()
   var networkLocation = "/default-rack"
   var lastHeartbeat: Long = 0
+  val diskInfos =
+    if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, 
DiskInfo](_diskInfos) else null
+  val userResourceConsumption =
+    if (_userResourceConsumption != null)
+      JavaUtils.newConcurrentHashMap[UserIdentifier, 
ResourceConsumption](_userResourceConsumption)
+    else null
+  var endpoint: RpcEndpointRef = null
 
   def this(host: String, rpcPort: Int, pushPort: Int, fetchPort: Int, 
replicatePort: Int) {
     this(
@@ -49,26 +56,7 @@ class WorkerInfo(
       fetchPort,
       replicatePort,
       new util.HashMap[String, DiskInfo](),
-      JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](),
-      null)
-  }
-
-  def this(
-      host: String,
-      rpcPort: Int,
-      pushPort: Int,
-      fetchPort: Int,
-      replicatePort: Int,
-      endpoint: RpcEndpointRef) {
-    this(
-      host,
-      rpcPort,
-      pushPort,
-      fetchPort,
-      replicatePort,
-      new util.HashMap[String, DiskInfo](),
-      JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](),
-      endpoint)
+      new util.HashMap[UserIdentifier, ResourceConsumption]())
   }
 
   val allocationBuckets = new Array[Int](61)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index e6e7a1e6f..be4aab245 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -191,8 +191,7 @@ object PbSerDeUtils {
       pbWorkerInfo.getFetchPort,
       pbWorkerInfo.getReplicatePort,
       disks,
-      userResourceConsumption,
-      null)
+      userResourceConsumption)
   }
 
   def toPbWorkerInfo(
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index 8ccf1d295..221dffa47 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -56,7 +56,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
       replicatePort: Int,
       workerInfos: jMap[WorkerInfo, util.Map[String, Integer]],
       allocationMap: util.Map[String, Integer]): Unit = {
-    val worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort, null)
+    val worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort)
     val realWorker = workerInfos.get(worker)
     assertNotNull(s"Worker $worker didn't exist.", realWorker)
   }
@@ -71,7 +71,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
       JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]()
     userResourceConsumption.put(UserIdentifier("tenant1", "name1"), 
ResourceConsumption(1, 1, 1, 1))
     val worker =
-      new WorkerInfo("localhost", 10000, 10001, 10002, 10003, disks, 
userResourceConsumption, null)
+      new WorkerInfo("localhost", 10000, 10001, 10002, 10003, disks, 
userResourceConsumption)
 
     val allocatedSlots = new AtomicInteger(0)
     val shuffleKey = "appId-shuffleId"
@@ -137,32 +137,32 @@ class WorkerInfoSuite extends CelebornFunSuite {
   }
 
   test("WorkerInfo not equals when host different.") {
-    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null, 
null)
-    val worker2 = new WorkerInfo("h2", 10001, 10002, 10003, 1000, null, null, 
null)
+    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+    val worker2 = new WorkerInfo("h2", 10001, 10002, 10003, 1000, null, null)
     assertNotEquals(worker1, worker2)
   }
 
   test("WorkerInfo not equals when rpc port different.") {
-    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null, 
null)
-    val worker2 = new WorkerInfo("h1", 20001, 10002, 10003, 1000, null, null, 
null)
+    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+    val worker2 = new WorkerInfo("h1", 20001, 10002, 10003, 1000, null, null)
     assertNotEquals(worker1, worker2)
   }
 
   test("WorkerInfo not equals when push port different.") {
-    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null, 
null)
-    val worker2 = new WorkerInfo("h1", 10001, 20002, 10003, 1000, null, null, 
null)
+    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+    val worker2 = new WorkerInfo("h1", 10001, 20002, 10003, 1000, null, null)
     assertNotEquals(worker1, worker2)
   }
 
   test("WorkerInfo not equals when fetch port different.") {
-    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null, 
null)
-    val worker2 = new WorkerInfo("h1", 10001, 10002, 20003, 1000, null, null, 
null)
+    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+    val worker2 = new WorkerInfo("h1", 10001, 10002, 20003, 1000, null, null)
     assertNotEquals(worker1, worker2)
   }
 
   test("WorkerInfo not equals when replicate port different.") {
-    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null, 
null)
-    val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 2000, null, null, 
null)
+    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+    val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 2000, null, null)
     assertNotEquals(worker1, worker2)
   }
 
@@ -174,9 +174,8 @@ class WorkerInfoSuite extends CelebornFunSuite {
       10003,
       1000,
       new util.HashMap[String, DiskInfo](),
-      null,
       null)
-    val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null, 
null)
+    val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
     assertEquals(worker1, worker2)
   }
 
@@ -188,31 +187,20 @@ class WorkerInfoSuite extends CelebornFunSuite {
       10003,
       1000,
       null,
-      new util.HashMap[UserIdentifier, ResourceConsumption](),
-      null)
-    val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null, 
null)
+      new util.HashMap[UserIdentifier, ResourceConsumption]())
+    val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
     assertEquals(worker1, worker2)
   }
 
   test("WorkerInfo equals when endpoint different") {
-    val mockEndpoint = new RpcEndpointRef(new CelebornConf()) {
-
-      override def address: RpcAddress = ???
-
-      override def name: String = ???
-
-      override def send(message: Any): Unit = ???
-
-      override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): 
concurrent.Future[T] = ???
-    }
-    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null, 
mockEndpoint)
-    val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null, 
null)
+    val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
+    val worker2 = new WorkerInfo("h1", 10001, 10002, 10003, 1000, null, null)
     assertEquals(worker1, worker2)
   }
 
   test("WorkerInfo toString output") {
     val worker1 = new WorkerInfo("h1", 10001, 10002, 10003, 1000)
-    val worker2 = new WorkerInfo("h2", 20001, 20002, 20003, 2000, null, null, 
null)
+    val worker2 = new WorkerInfo("h2", 20001, 20002, 20003, 2000, null, null)
 
     val worker3 = new WorkerInfo(
       "h3",
@@ -221,7 +209,6 @@ class WorkerInfoSuite extends CelebornFunSuite {
       30003,
       3000,
       new util.HashMap[String, DiskInfo](),
-      null,
       null)
 
     val disks = new util.HashMap[String, DiskInfo]()
@@ -236,8 +223,6 @@ class WorkerInfoSuite extends CelebornFunSuite {
     val conf = new CelebornConf()
     val endpointAddress = new RpcEndpointAddress(new RpcAddress("localhost", 
12345), "mockRpc")
     val rpcEnv = RpcEnv.create("mockEnv", "localhost", "localhost", 12345, 
conf, 64)
-    val rpcEndpointRef =
-      new NettyRpcEndpointRef(conf, endpointAddress, 
rpcEnv.asInstanceOf[NettyRpcEnv])
     val worker4 = new WorkerInfo(
       "h4",
       40001,
@@ -245,8 +230,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
       40003,
       4000,
       disks,
-      userResourceConsumption,
-      rpcEndpointRef)
+      userResourceConsumption)
 
     val placeholder = ""
     val exp1 =
@@ -304,7 +288,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
          |  DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2147483647, 
avgFlushTime: 2, avgFetchTime: 2, activeSlots: 20) status: HEALTHY dirs 
$placeholder
          |UserResourceConsumption: $placeholder
          |  UserIdentifier: `tenant1`.`name1`, ResourceConsumption: 
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, 
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
-         |WorkerRef: NettyRpcEndpointRef(rss://mockRpc@localhost:12345)
+         |WorkerRef: null
          |""".stripMargin;
 
     println(worker1)
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index fa60db3f7..8ac9c7138 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -68,9 +68,9 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
   userResourceConsumption.put(userIdentifier2, resourceConsumption2)
 
   val workerInfo1 =
-    new WorkerInfo("localhost", 1001, 1002, 1003, 1004, diskInfos, 
userResourceConsumption, null)
+    new WorkerInfo("localhost", 1001, 1002, 1003, 1004, diskInfos, 
userResourceConsumption)
   val workerInfo2 =
-    new WorkerInfo("localhost", 2001, 2002, 2003, 2004, diskInfos, 
userResourceConsumption, null)
+    new WorkerInfo("localhost", 2001, 2002, 2003, 2004, diskInfos, 
userResourceConsumption)
 
   val partitionLocation1 =
     new PartitionLocation(0, 0, "host1", 10, 9, 8, 14, 
PartitionLocation.Mode.SLAVE)
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 20ccd764b..5275ee720 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -152,7 +152,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
 
   public void updateWorkerLostMeta(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort) {
-    WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort, null);
+    WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort);
     workerLostEvents.add(worker);
     // remove worker from workers
     synchronized (workers) {
@@ -166,7 +166,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
 
   public void updateWorkerRemoveMeta(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort) {
-    WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort, null);
+    WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort);
     // remove worker from workers
     synchronized (workers) {
       workers.remove(worker);
@@ -188,14 +188,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       long time) {
     WorkerInfo worker =
         new WorkerInfo(
-            host,
-            rpcPort,
-            pushPort,
-            fetchPort,
-            replicatePort,
-            disks,
-            userResourceConsumption,
-            null);
+            host, rpcPort, pushPort, fetchPort, replicatePort, disks, 
userResourceConsumption);
     AtomicLong availableSlots = new AtomicLong();
     LOG.debug("update worker {}:{} heart beat {}", host, rpcPort, disks);
     synchronized (workers) {
@@ -228,14 +221,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       Map<UserIdentifier, ResourceConsumption> userResourceConsumption) {
     WorkerInfo workerInfo =
         new WorkerInfo(
-            host,
-            rpcPort,
-            pushPort,
-            fetchPort,
-            replicatePort,
-            disks,
-            userResourceConsumption,
-            null);
+            host, rpcPort, pushPort, fetchPort, replicatePort, disks, 
userResourceConsumption);
     workerInfo.lastHeartbeat_$eq(System.currentTimeMillis());
     
workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation());
     workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 563580101..7d7811a1a 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -454,8 +454,7 @@ private[celeborn] class Master(
       fetchPort,
       replicatePort,
       new util.HashMap[String, DiskInfo](),
-      JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](),
-      null)
+      JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]())
     val worker: WorkerInfo = workersSnapShot
       .asScala
       .find(_ == targetWorker)
@@ -489,8 +488,7 @@ private[celeborn] class Master(
         fetchPort,
         replicatePort,
         disks,
-        userResourceConsumption,
-        null)
+        userResourceConsumption)
     if (workersSnapShot.contains(workerToRegister)) {
       logWarning(s"Receive RegisterWorker while worker" +
         s" ${workerToRegister.toString()} already exists, re-register.")
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
index 4ed0e6f48..bc7b54b33 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
@@ -120,12 +120,12 @@ public class SlotsAllocatorRackAwareSuiteJ {
 
   private List<WorkerInfo> prepareWorkers(CelebornRackResolver resolver) {
     ArrayList<WorkerInfo> workers = new ArrayList<>(3);
-    workers.add(new WorkerInfo("host1", 9, 10, 110, 113, new HashMap<>(), 
null, null));
-    workers.add(new WorkerInfo("host2", 9, 11, 111, 114, new HashMap<>(), 
null, null));
-    workers.add(new WorkerInfo("host3", 9, 12, 112, 115, new HashMap<>(), 
null, null));
-    workers.add(new WorkerInfo("host4", 9, 10, 110, 113, new HashMap<>(), 
null, null));
-    workers.add(new WorkerInfo("host5", 9, 11, 111, 114, new HashMap<>(), 
null, null));
-    workers.add(new WorkerInfo("host6", 9, 12, 112, 115, new HashMap<>(), 
null, null));
+    workers.add(new WorkerInfo("host1", 9, 10, 110, 113, new HashMap<>(), 
null));
+    workers.add(new WorkerInfo("host2", 9, 11, 111, 114, new HashMap<>(), 
null));
+    workers.add(new WorkerInfo("host3", 9, 12, 112, 115, new HashMap<>(), 
null));
+    workers.add(new WorkerInfo("host4", 9, 10, 110, 113, new HashMap<>(), 
null));
+    workers.add(new WorkerInfo("host5", 9, 11, 111, 114, new HashMap<>(), 
null));
+    workers.add(new WorkerInfo("host6", 9, 12, 112, 115, new HashMap<>(), 
null));
 
     workers.forEach(
         new Consumer<WorkerInfo>() {
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 900752672..ef62ac935 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -130,9 +130,9 @@ public class SlotsAllocatorSuiteJ {
     disks3.put("/mnt/disk3", diskInfo9);
 
     ArrayList<WorkerInfo> workers = new ArrayList<>(3);
-    workers.add(new WorkerInfo("host1", 9, 10, 110, 113, disks1, null, null));
-    workers.add(new WorkerInfo("host2", 9, 11, 111, 114, disks2, null, null));
-    workers.add(new WorkerInfo("host3", 9, 12, 112, 115, disks3, null, null));
+    workers.add(new WorkerInfo("host1", 9, 10, 110, 113, disks1, null));
+    workers.add(new WorkerInfo("host2", 9, 11, 111, 114, disks2, null));
+    workers.add(new WorkerInfo("host3", 9, 12, 112, 115, disks3, null));
     return workers;
   }
 
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index f58ebba2a..115426c8b 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -220,8 +220,7 @@ public class DefaultMetaSystemSuiteJ {
             FETCHPORT1,
             REPLICATEPORT1,
             disks1,
-            userResourceConsumption1,
-            dummyRef);
+            userResourceConsumption1);
     WorkerInfo workerInfo2 =
         new WorkerInfo(
             HOSTNAME2,
@@ -230,8 +229,7 @@ public class DefaultMetaSystemSuiteJ {
             FETCHPORT2,
             REPLICATEPORT2,
             disks2,
-            userResourceConsumption2,
-            dummyRef);
+            userResourceConsumption2);
     WorkerInfo workerInfo3 =
         new WorkerInfo(
             HOSTNAME3,
@@ -240,8 +238,7 @@ public class DefaultMetaSystemSuiteJ {
             FETCHPORT3,
             REPLICATEPORT3,
             disks3,
-            userResourceConsumption3,
-            dummyRef);
+            userResourceConsumption3);
 
     Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
     Map<String, Integer> allocation = new HashMap<>();
@@ -369,8 +366,7 @@ public class DefaultMetaSystemSuiteJ {
             FETCHPORT1,
             REPLICATEPORT1,
             disks1,
-            userResourceConsumption1,
-            dummyRef);
+            userResourceConsumption1);
     WorkerInfo workerInfo2 =
         new WorkerInfo(
             HOSTNAME2,
@@ -379,8 +375,7 @@ public class DefaultMetaSystemSuiteJ {
             FETCHPORT2,
             REPLICATEPORT2,
             disks2,
-            userResourceConsumption2,
-            dummyRef);
+            userResourceConsumption2);
 
     Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
     Map<String, Integer> allocation = new HashMap<>();
@@ -435,8 +430,7 @@ public class DefaultMetaSystemSuiteJ {
             FETCHPORT1,
             REPLICATEPORT1,
             disks1,
-            userResourceConsumption1,
-            dummyRef);
+            userResourceConsumption1);
     WorkerInfo workerInfo2 =
         new WorkerInfo(
             HOSTNAME2,
@@ -445,8 +439,7 @@ public class DefaultMetaSystemSuiteJ {
             FETCHPORT2,
             REPLICATEPORT2,
             disks2,
-            userResourceConsumption2,
-            dummyRef);
+            userResourceConsumption2);
 
     Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
     Map<String, Integer> allocation = new HashMap<>();
@@ -587,8 +580,7 @@ public class DefaultMetaSystemSuiteJ {
             FETCHPORT1,
             REPLICATEPORT1,
             disks1,
-            userResourceConsumption1,
-            dummyRef);
+            userResourceConsumption1);
     WorkerInfo workerInfo2 =
         new WorkerInfo(
             HOSTNAME2,
@@ -597,8 +589,7 @@ public class DefaultMetaSystemSuiteJ {
             FETCHPORT2,
             REPLICATEPORT2,
             disks2,
-            userResourceConsumption2,
-            dummyRef);
+            userResourceConsumption2);
 
     List<WorkerInfo> failedWorkers = new ArrayList<>();
     failedWorkers.add(workerInfo1);
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index df2e9bc26..c092fbf7a 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -246,9 +246,9 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
     userResourceConsumption3.put(
         new UserIdentifier("tenant3", "name3"), new ResourceConsumption(3000, 
3, 3000, 3));
 
-    WorkerInfo info1 = new WorkerInfo("host1", 1, 2, 3, 10, disks1, 
userResourceConsumption1, null);
-    WorkerInfo info2 = new WorkerInfo("host2", 4, 5, 6, 11, disks2, 
userResourceConsumption2, null);
-    WorkerInfo info3 = new WorkerInfo("host3", 7, 8, 9, 12, disks3, 
userResourceConsumption3, null);
+    WorkerInfo info1 = new WorkerInfo("host1", 1, 2, 3, 10, disks1, 
userResourceConsumption1);
+    WorkerInfo info2 = new WorkerInfo("host2", 4, 5, 6, 11, disks2, 
userResourceConsumption2);
+    WorkerInfo info3 = new WorkerInfo("host3", 7, 8, 9, 12, disks3, 
userResourceConsumption3);
 
     String host1 = "host1";
     String host2 = "host2";
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 6fad572af..fdb0ca0e1 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -368,8 +368,7 @@ public class RatisMasterStatusSystemSuiteJ {
             FETCHPORT1,
             REPLICATEPORT1,
             disks1,
-            userResourceConsumption1,
-            dummyRef);
+            userResourceConsumption1);
     WorkerInfo workerInfo2 =
         new WorkerInfo(
             HOSTNAME2,
@@ -378,8 +377,7 @@ public class RatisMasterStatusSystemSuiteJ {
             FETCHPORT2,
             REPLICATEPORT2,
             disks2,
-            userResourceConsumption2,
-            dummyRef);
+            userResourceConsumption2);
     WorkerInfo workerInfo3 =
         new WorkerInfo(
             HOSTNAME3,
@@ -388,8 +386,7 @@ public class RatisMasterStatusSystemSuiteJ {
             FETCHPORT3,
             REPLICATEPORT3,
             disks3,
-            userResourceConsumption3,
-            dummyRef);
+            userResourceConsumption3);
 
     Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
     Map<String, Integer> allocation1 = new HashMap<>();
@@ -566,8 +563,7 @@ public class RatisMasterStatusSystemSuiteJ {
             FETCHPORT1,
             REPLICATEPORT1,
             disks1,
-            userResourceConsumption1,
-            dummyRef);
+            userResourceConsumption1);
     WorkerInfo workerInfo2 =
         new WorkerInfo(
             HOSTNAME2,
@@ -576,8 +572,7 @@ public class RatisMasterStatusSystemSuiteJ {
             FETCHPORT2,
             REPLICATEPORT2,
             disks2,
-            userResourceConsumption2,
-            dummyRef);
+            userResourceConsumption2);
     Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
     Map<String, Integer> allocations = new HashMap<>();
     allocations.put("disk1", 5);
@@ -640,8 +635,7 @@ public class RatisMasterStatusSystemSuiteJ {
             FETCHPORT1,
             REPLICATEPORT1,
             disks1,
-            userResourceConsumption1,
-            dummyRef);
+            userResourceConsumption1);
     WorkerInfo workerInfo2 =
         new WorkerInfo(
             HOSTNAME2,
@@ -650,8 +644,7 @@ public class RatisMasterStatusSystemSuiteJ {
             FETCHPORT2,
             REPLICATEPORT2,
             disks2,
-            userResourceConsumption2,
-            dummyRef);
+            userResourceConsumption2);
 
     Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
     Map<String, Integer> allocations = new HashMap<>();
@@ -869,8 +862,7 @@ public class RatisMasterStatusSystemSuiteJ {
             FETCHPORT1,
             REPLICATEPORT1,
             disks1,
-            userResourceConsumption1,
-            dummyRef);
+            userResourceConsumption1);
     WorkerInfo workerInfo2 =
         new WorkerInfo(
             HOSTNAME2,
@@ -879,8 +871,7 @@ public class RatisMasterStatusSystemSuiteJ {
             FETCHPORT2,
             REPLICATEPORT2,
             disks2,
-            userResourceConsumption2,
-            dummyRef);
+            userResourceConsumption2);
 
     List<WorkerInfo> failedWorkers = new ArrayList<>();
     failedWorkers.add(workerInfo1);
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 361a715ad..6cdbf7913 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -207,8 +207,7 @@ private[celeborn] class Worker(
       fetchPort,
       replicatePort,
       diskInfos,
-      userResourceConsumption,
-      controller.self)
+      userResourceConsumption)
 
   // whether this Worker registered to Master successfully
   val registered = new AtomicBoolean(false)

Reply via email to