This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new affc8f104 [CELEBORN-805] Immediate shutdown of server upon completion 
of unit test to prevent potential resource leakage
affc8f104 is described below

commit affc8f104c0f6957f121b18c0cda219dfca8bb8b
Author: Fu Chen <[email protected]>
AuthorDate: Tue Jul 18 13:12:51 2023 +0800

    [CELEBORN-805] Immediate shutdown of server upon completion of unit test to 
prevent potential resource leakage
    
    ### What changes were proposed in this pull request?
    
    As title
    
    ### Why are the changes needed?
    
    Recently, while conducting the sbt build test, it came to my attention that 
certain resources such as ports and threads were not being released promptly.
    
    This pull request introduces a new method, `shutdown(graceful: Boolean)`, 
to the `Service` trait. When invoked by 
`MiniClusterFeature.shutdownMiniCluster`, it calls `worker.shutdown(graceful = 
false)`. This implementation aims to prevent possible memory leaks during CI 
processes.
    
    Before this PR the unit tests in the `client/common/master/service/worker` 
modules resulted in leaked ports.
    
    ```
    $ jps
    1138131 Jps
    1130743 sbt-launch-1.9.0.jar
    $ netstat -lntp | grep 1130743
    (Not all processes could be identified, non-owned process info
     will not be shown, you would have to be root to see it all.)
    tcp        0      0 127.0.0.1:12345         0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:41563           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:42905           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:44419           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:45025           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:44799           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:39053           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:39029           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:39475           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:40153           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:33051           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:33449           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:34073           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:35347           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:35971           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 0.0.0.0:36799           0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 192.168.1.151:40775     0.0.0.0:*               LISTEN  
    1130743/java
    tcp        0      0 192.168.1.151:44457     0.0.0.0:*               LISTEN  
    1130743/java
    ```
    
    After this PR:
    
    ```
    $ jps
    1114423 Jps
    1107544 sbt-launch-1.9.0.jar
    $ netstat -lntp | grep 1107544
    (Not all processes could be identified, non-owned process info
     will not be shown, you would have to be root to see it all.)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #1727 from cfmcgrady/shutdown.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 7c6644b1a7509f0d574252270e46fbc20a8e4bb2)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../common/network/server/TransportServer.java     |  16 ++-
 .../celeborn/common/meta/WorkerInfoSuite.scala     | 152 +++++++++++----------
 .../celeborn/server/common/HttpService.scala       |  13 +-
 .../apache/celeborn/server/common/Service.scala    |   2 +
 .../celeborn/server/common/http/HttpServer.scala   |  14 +-
 .../celeborn/service/deploy/worker/Worker.scala    |  47 +++++--
 .../service/deploy/MiniClusterFeature.scala        |   3 +-
 .../deploy/worker/storage/WorkerSuite.scala        |  25 +++-
 8 files changed, 174 insertions(+), 98 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
index 1195bafa4..f323786bf 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
@@ -130,16 +130,28 @@ public class TransportServer implements Closeable {
 
   @Override
   public void close() {
+    shutdown(true);
+  }
+
+  public void shutdown(boolean graceful) {
     if (channelFuture != null) {
       // close is a local operation and should finish within milliseconds; 
timeout just to be safe
       channelFuture.channel().close().awaitUninterruptibly(10, 
TimeUnit.SECONDS);
       channelFuture = null;
     }
     if (bootstrap != null && bootstrap.config().group() != null) {
-      bootstrap.config().group().shutdownGracefully();
+      if (graceful) {
+        bootstrap.config().group().shutdownGracefully();
+      } else {
+        bootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS);
+      }
     }
     if (bootstrap != null && bootstrap.config().childGroup() != null) {
-      bootstrap.config().childGroup().shutdownGracefully();
+      if (graceful) {
+        bootstrap.config().childGroup().shutdownGracefully();
+      } else {
+        bootstrap.config().childGroup().shutdownGracefully(0, 0, 
TimeUnit.SECONDS);
+      }
     }
     bootstrap = null;
   }
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index a3ad20242..93f210f1f 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -222,83 +222,85 @@ class WorkerInfoSuite extends CelebornFunSuite {
       ResourceConsumption(20971520, 1, 52428800, 1))
     val conf = new CelebornConf()
     val endpointAddress = new RpcEndpointAddress(new RpcAddress("localhost", 
12345), "mockRpc")
-    val rpcEnv = RpcEnv.create("mockEnv", "localhost", "localhost", 12345, 
conf, 64)
-    val worker4 = new WorkerInfo(
-      "h4",
-      40001,
-      40002,
-      40003,
-      4000,
-      disks,
-      userResourceConsumption)
+    var rpcEnv: RpcEnv = null
+    try {
+      rpcEnv = RpcEnv.create("mockEnv", "localhost", "localhost", 12345, conf, 
64)
+      val worker4 = new WorkerInfo(
+        "h4",
+        40001,
+        40002,
+        40003,
+        4000,
+        disks,
+        userResourceConsumption)
 
-    val placeholder = ""
-    val exp1 =
-      s"""
-         |Host: h1
-         |RpcPort: 10001
-         |PushPort: 10002
-         |FetchPort: 10003
-         |ReplicatePort: 1000
-         |SlotsUsed: 0
-         |LastHeartbeat: 0
-         |Disks: empty
-         |UserResourceConsumption: empty
-         |WorkerRef: null
-         |""".stripMargin
+      val placeholder = ""
+      val exp1 =
+        s"""
+           |Host: h1
+           |RpcPort: 10001
+           |PushPort: 10002
+           |FetchPort: 10003
+           |ReplicatePort: 1000
+           |SlotsUsed: 0
+           |LastHeartbeat: 0
+           |Disks: empty
+           |UserResourceConsumption: empty
+           |WorkerRef: null
+           |""".stripMargin
 
-    val exp2 =
-      """
-        |Host: h2
-        |RpcPort: 20001
-        |PushPort: 20002
-        |FetchPort: 20003
-        |ReplicatePort: 2000
-        |SlotsUsed: 0
-        |LastHeartbeat: 0
-        |Disks: empty
-        |UserResourceConsumption: empty
-        |WorkerRef: null
-        |""".stripMargin
-    val exp3 =
-      s"""
-         |Host: h3
-         |RpcPort: 30001
-         |PushPort: 30002
-         |FetchPort: 30003
-         |ReplicatePort: 3000
-         |SlotsUsed: 0
-         |LastHeartbeat: 0
-         |Disks: empty
-         |UserResourceConsumption: empty
-         |WorkerRef: null
-         |""".stripMargin
-    val exp4 =
-      s"""
-         |Host: h4
-         |RpcPort: 40001
-         |PushPort: 40002
-         |FetchPort: 40003
-         |ReplicatePort: 4000
-         |SlotsUsed: 60
-         |LastHeartbeat: 0
-         |Disks: $placeholder
-         |  DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, 
avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30) status: HEALTHY dirs 
$placeholder
-         |  DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, 
avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10) status: HEALTHY dirs 
$placeholder
-         |  DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, 
avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20) status: HEALTHY dirs 
$placeholder
-         |UserResourceConsumption: $placeholder
-         |  UserIdentifier: `tenant1`.`name1`, ResourceConsumption: 
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, 
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
-         |WorkerRef: null
-         |""".stripMargin;
+      val exp2 =
+        """
+          |Host: h2
+          |RpcPort: 20001
+          |PushPort: 20002
+          |FetchPort: 20003
+          |ReplicatePort: 2000
+          |SlotsUsed: 0
+          |LastHeartbeat: 0
+          |Disks: empty
+          |UserResourceConsumption: empty
+          |WorkerRef: null
+          |""".stripMargin
+      val exp3 =
+        s"""
+           |Host: h3
+           |RpcPort: 30001
+           |PushPort: 30002
+           |FetchPort: 30003
+           |ReplicatePort: 3000
+           |SlotsUsed: 0
+           |LastHeartbeat: 0
+           |Disks: empty
+           |UserResourceConsumption: empty
+           |WorkerRef: null
+           |""".stripMargin
+      val exp4 =
+        s"""
+           |Host: h4
+           |RpcPort: 40001
+           |PushPort: 40002
+           |FetchPort: 40003
+           |ReplicatePort: 4000
+           |SlotsUsed: 60
+           |LastHeartbeat: 0
+           |Disks: $placeholder
+           |  DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, 
avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30) status: HEALTHY dirs 
$placeholder
+           |  DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, 
avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10) status: HEALTHY dirs 
$placeholder
+           |  DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, 
avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20) status: HEALTHY dirs 
$placeholder
+           |UserResourceConsumption: $placeholder
+           |  UserIdentifier: `tenant1`.`name1`, ResourceConsumption: 
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, 
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
+           |WorkerRef: null
+           |""".stripMargin;
 
-    println(worker1)
-    println(worker2)
-    println(worker3)
-    println(worker4)
-
-    assertEquals(exp1, 
worker1.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
-    assertEquals(exp2, 
worker2.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
-    assertEquals(exp3, 
worker3.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
-    assertEquals(exp4, 
worker4.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
+      assertEquals(exp1, 
worker1.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
+      assertEquals(exp2, 
worker2.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
+      assertEquals(exp3, 
worker3.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
+      assertEquals(exp4, 
worker4.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
+    } finally {
+      if (null != rpcEnv) {
+        rpcEnv.shutdown()
+      }
+    }
   }
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 3cfb34f3c..3f2d19b90 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -106,7 +106,18 @@ abstract class HttpService extends Service with Logging {
   }
 
   override def close(): Unit = {
-    httpServer.stop()
+    // may be null when running the unit test
+    if (null != httpServer) {
+      httpServer.stop(true)
+    }
     super.close()
   }
+
+  override def shutdown(graceful: Boolean): Unit = {
+    // may be null when running the unit test
+    if (null != httpServer) {
+      httpServer.stop(graceful)
+    }
+    super.shutdown(graceful)
+  }
 }
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
index 592af3c02..129f28f4d 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
@@ -36,6 +36,8 @@ abstract class Service extends Logging {
   }
 
   def close(): Unit = {}
+
+  def shutdown(graceful: Boolean): Unit = {}
 }
 
 object Service {
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
index 115843c36..8f4f43891 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
@@ -56,7 +56,7 @@ class HttpServer(
     isStarted = true
   }
 
-  def stop(): Unit = synchronized {
+  def stop(graceful: Boolean): Unit = synchronized {
     if (isStarted) {
       logInfo(s"$role: Stopping HttpServer")
       if (bindFuture != null) {
@@ -66,12 +66,20 @@ class HttpServer(
       }
       if (bootstrap != null && bootstrap.config.group != null) {
         Utils.tryLogNonFatalError {
-          bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
+          if (graceful) {
+            bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
+          } else {
+            bootstrap.config.group.shutdownGracefully(0, 0, TimeUnit.SECONDS)
+          }
         }
       }
       if (bootstrap != null && bootstrap.config.childGroup != null) {
         Utils.tryLogNonFatalError {
-          bootstrap.config.childGroup.shutdownGracefully(3, 5, 
TimeUnit.SECONDS)
+          if (graceful) {
+            bootstrap.config.childGroup.shutdownGracefully(3, 5, 
TimeUnit.SECONDS)
+          } else {
+            bootstrap.config.childGroup.shutdownGracefully(0, 0, 
TimeUnit.SECONDS)
+          }
         }
       }
       bootstrap = null
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 572d03247..0ad8b5d0b 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -393,37 +393,62 @@ private[celeborn] class Worker(
   }
 
   override def close(): Unit = synchronized {
+    shutdown(gracefulShutdown)
+  }
+
+  override def shutdown(graceful: Boolean): Unit = {
     if (!stopped) {
       logInfo("Stopping Worker.")
 
       if (sendHeartbeatTask != null) {
-        sendHeartbeatTask.cancel(true)
+        if (graceful) {
+          sendHeartbeatTask.cancel(false)
+        } else {
+          sendHeartbeatTask.cancel(true)
+        }
         sendHeartbeatTask = null
       }
       if (checkFastFailTask != null) {
-        checkFastFailTask.cancel(true)
+        if (graceful) {
+          checkFastFailTask.cancel(false)
+        } else {
+          checkFastFailTask.cancel(true)
+        }
         checkFastFailTask = null
       }
-      forwardMessageScheduler.shutdownNow()
-      replicateThreadPool.shutdownNow()
-      commitThreadPool.shutdownNow()
-      asyncReplyPool.shutdownNow()
-      partitionsSorter.close()
+      if (graceful) {
+        forwardMessageScheduler.shutdown()
+        replicateThreadPool.shutdown()
+        commitThreadPool.shutdown()
+        asyncReplyPool.shutdown()
+        partitionsSorter.close()
+      } else {
+        forwardMessageScheduler.shutdownNow()
+        replicateThreadPool.shutdownNow()
+        commitThreadPool.shutdownNow()
+        asyncReplyPool.shutdownNow()
+        partitionsSorter.close()
+      }
 
       if (null != storageManager) {
         storageManager.close()
       }
-      memoryManager.close();
+      memoryManager.close()
 
       masterClient.close()
-      replicateServer.close()
-      fetchServer.close()
+      replicateServer.shutdown(graceful)
+      fetchServer.shutdown(graceful)
+      // TODO: `pushServer` never be closed before this PR.
+      pushServer.shutdown(graceful)
 
-      super.close()
+      super.shutdown(graceful)
 
       logInfo("Worker is stopped.")
       stopped = true
     }
+    if (!graceful) {
+      shutdown.set(true)
+    }
   }
 
   private def registerWithMaster(): Unit = {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index 6914d9215..a61854057 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -130,7 +130,8 @@ trait MiniClusterFeature extends Logging {
     // interrupt threads
     Thread.sleep(5000)
     workerInfos.foreach {
-      case (_, thread) =>
+      case (worker, thread) =>
+        worker.shutdown(graceful = false)
         thread.interrupt()
     }
     workerInfos.clear()
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index da78af885..01c823c0c 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
 
 import org.junit.Assert
 import org.mockito.MockitoSugar._
+import org.scalatest.BeforeAndAfterEach
 import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.common.CelebornConf
@@ -33,12 +34,26 @@ import 
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
 import org.apache.celeborn.common.util.JavaUtils
 import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
 
-class WorkerSuite extends AnyFunSuite {
-  val conf = new CelebornConf()
-  val workerArgs = new WorkerArguments(Array(), conf)
+class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
+  private var worker: Worker = _
+  private val conf = new CelebornConf()
+  private val workerArgs = new WorkerArguments(Array(), conf)
+
+  override def beforeEach(): Unit = {
+    assert(null == worker)
+  }
+
+  override def afterEach(): Unit = {
+    if (null != worker) {
+      worker.rpcEnv.shutdown()
+      worker.shutdown(false)
+      worker = null
+    }
+  }
+
   test("clean up") {
     conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
-    val worker = new Worker(conf, workerArgs)
+    worker = new Worker(conf, workerArgs)
 
     val pl1 = new PartitionLocation(0, 0, "12", 0, 0, 0, 0, 
PartitionLocation.Mode.PRIMARY)
     val pl2 = new PartitionLocation(1, 0, "12", 0, 0, 0, 0, 
PartitionLocation.Mode.REPLICA)
@@ -74,7 +89,7 @@ class WorkerSuite extends AnyFunSuite {
 
   test("flush filewriters") {
     conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
-    val worker = new Worker(conf, workerArgs)
+    worker = new Worker(conf, workerArgs)
     val dir = new File("/tmp")
     val allWriters = new util.HashSet[FileWriter]()
     val map = JavaUtils.newConcurrentHashMap[String, FileWriter]()

Reply via email to