This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new affc8f104 [CELEBORN-805] Immediate shutdown of server upon completion
of unit test to prevent potential resource leakage
affc8f104 is described below
commit affc8f104c0f6957f121b18c0cda219dfca8bb8b
Author: Fu Chen <[email protected]>
AuthorDate: Tue Jul 18 13:12:51 2023 +0800
[CELEBORN-805] Immediate shutdown of server upon completion of unit test to
prevent potential resource leakage
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Recently, while conducting the sbt build test, it came to my attention that
certain resources such as ports and threads were not being released promptly.
This pull request introduces a new method, `shutdown(graceful: Boolean)`,
to the `Service` trait. When invoked by
`MiniClusterFeature.shutdownMiniCluster`, it calls `worker.shutdown(graceful =
false)`. This implementation aims to prevent possible memory leaks during CI
processes.
Before this PR the unit tests in the `client/common/master/service/worker`
modules resulted in leaked ports.
```
$ jps
1138131 Jps
1130743 sbt-launch-1.9.0.jar
$ netstat -lntp | grep 1130743
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 127.0.0.1:12345 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:41563 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:42905 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:44419 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:45025 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:44799 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:39053 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:39029 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:39475 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:40153 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:33051 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:33449 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:34073 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:35347 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:35971 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 0.0.0.0:36799 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 192.168.1.151:40775 0.0.0.0:* LISTEN
1130743/java
tcp 0 0 192.168.1.151:44457 0.0.0.0:* LISTEN
1130743/java
```
After this PR:
```
$ jps
1114423 Jps
1107544 sbt-launch-1.9.0.jar
$ netstat -lntp | grep 1107544
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #1727 from cfmcgrady/shutdown.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 7c6644b1a7509f0d574252270e46fbc20a8e4bb2)
Signed-off-by: Cheng Pan <[email protected]>
---
.../common/network/server/TransportServer.java | 16 ++-
.../celeborn/common/meta/WorkerInfoSuite.scala | 152 +++++++++++----------
.../celeborn/server/common/HttpService.scala | 13 +-
.../apache/celeborn/server/common/Service.scala | 2 +
.../celeborn/server/common/http/HttpServer.scala | 14 +-
.../celeborn/service/deploy/worker/Worker.scala | 47 +++++--
.../service/deploy/MiniClusterFeature.scala | 3 +-
.../deploy/worker/storage/WorkerSuite.scala | 25 +++-
8 files changed, 174 insertions(+), 98 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
index 1195bafa4..f323786bf 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java
@@ -130,16 +130,28 @@ public class TransportServer implements Closeable {
@Override
public void close() {
+ shutdown(true);
+ }
+
+ public void shutdown(boolean graceful) {
if (channelFuture != null) {
// close is a local operation and should finish within milliseconds;
timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10,
TimeUnit.SECONDS);
channelFuture = null;
}
if (bootstrap != null && bootstrap.config().group() != null) {
- bootstrap.config().group().shutdownGracefully();
+ if (graceful) {
+ bootstrap.config().group().shutdownGracefully();
+ } else {
+ bootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS);
+ }
}
if (bootstrap != null && bootstrap.config().childGroup() != null) {
- bootstrap.config().childGroup().shutdownGracefully();
+ if (graceful) {
+ bootstrap.config().childGroup().shutdownGracefully();
+ } else {
+ bootstrap.config().childGroup().shutdownGracefully(0, 0,
TimeUnit.SECONDS);
+ }
}
bootstrap = null;
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index a3ad20242..93f210f1f 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -222,83 +222,85 @@ class WorkerInfoSuite extends CelebornFunSuite {
ResourceConsumption(20971520, 1, 52428800, 1))
val conf = new CelebornConf()
val endpointAddress = new RpcEndpointAddress(new RpcAddress("localhost",
12345), "mockRpc")
- val rpcEnv = RpcEnv.create("mockEnv", "localhost", "localhost", 12345,
conf, 64)
- val worker4 = new WorkerInfo(
- "h4",
- 40001,
- 40002,
- 40003,
- 4000,
- disks,
- userResourceConsumption)
+ var rpcEnv: RpcEnv = null
+ try {
+ rpcEnv = RpcEnv.create("mockEnv", "localhost", "localhost", 12345, conf,
64)
+ val worker4 = new WorkerInfo(
+ "h4",
+ 40001,
+ 40002,
+ 40003,
+ 4000,
+ disks,
+ userResourceConsumption)
- val placeholder = ""
- val exp1 =
- s"""
- |Host: h1
- |RpcPort: 10001
- |PushPort: 10002
- |FetchPort: 10003
- |ReplicatePort: 1000
- |SlotsUsed: 0
- |LastHeartbeat: 0
- |Disks: empty
- |UserResourceConsumption: empty
- |WorkerRef: null
- |""".stripMargin
+ val placeholder = ""
+ val exp1 =
+ s"""
+ |Host: h1
+ |RpcPort: 10001
+ |PushPort: 10002
+ |FetchPort: 10003
+ |ReplicatePort: 1000
+ |SlotsUsed: 0
+ |LastHeartbeat: 0
+ |Disks: empty
+ |UserResourceConsumption: empty
+ |WorkerRef: null
+ |""".stripMargin
- val exp2 =
- """
- |Host: h2
- |RpcPort: 20001
- |PushPort: 20002
- |FetchPort: 20003
- |ReplicatePort: 2000
- |SlotsUsed: 0
- |LastHeartbeat: 0
- |Disks: empty
- |UserResourceConsumption: empty
- |WorkerRef: null
- |""".stripMargin
- val exp3 =
- s"""
- |Host: h3
- |RpcPort: 30001
- |PushPort: 30002
- |FetchPort: 30003
- |ReplicatePort: 3000
- |SlotsUsed: 0
- |LastHeartbeat: 0
- |Disks: empty
- |UserResourceConsumption: empty
- |WorkerRef: null
- |""".stripMargin
- val exp4 =
- s"""
- |Host: h4
- |RpcPort: 40001
- |PushPort: 40002
- |FetchPort: 40003
- |ReplicatePort: 4000
- |SlotsUsed: 60
- |LastHeartbeat: 0
- |Disks: $placeholder
- | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB,
avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30) status: HEALTHY dirs
$placeholder
- | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB,
avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10) status: HEALTHY dirs
$placeholder
- | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB,
avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20) status: HEALTHY dirs
$placeholder
- |UserResourceConsumption: $placeholder
- | UserIdentifier: `tenant1`.`name1`, ResourceConsumption:
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1,
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
- |WorkerRef: null
- |""".stripMargin;
+ val exp2 =
+ """
+ |Host: h2
+ |RpcPort: 20001
+ |PushPort: 20002
+ |FetchPort: 20003
+ |ReplicatePort: 2000
+ |SlotsUsed: 0
+ |LastHeartbeat: 0
+ |Disks: empty
+ |UserResourceConsumption: empty
+ |WorkerRef: null
+ |""".stripMargin
+ val exp3 =
+ s"""
+ |Host: h3
+ |RpcPort: 30001
+ |PushPort: 30002
+ |FetchPort: 30003
+ |ReplicatePort: 3000
+ |SlotsUsed: 0
+ |LastHeartbeat: 0
+ |Disks: empty
+ |UserResourceConsumption: empty
+ |WorkerRef: null
+ |""".stripMargin
+ val exp4 =
+ s"""
+ |Host: h4
+ |RpcPort: 40001
+ |PushPort: 40002
+ |FetchPort: 40003
+ |ReplicatePort: 4000
+ |SlotsUsed: 60
+ |LastHeartbeat: 0
+ |Disks: $placeholder
+ | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB,
avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30) status: HEALTHY dirs
$placeholder
+ | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB,
avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10) status: HEALTHY dirs
$placeholder
+ | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB,
avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20) status: HEALTHY dirs
$placeholder
+ |UserResourceConsumption: $placeholder
+ | UserIdentifier: `tenant1`.`name1`, ResourceConsumption:
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1,
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
+ |WorkerRef: null
+ |""".stripMargin;
- println(worker1)
- println(worker2)
- println(worker3)
- println(worker4)
-
- assertEquals(exp1,
worker1.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
- assertEquals(exp2,
worker2.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
- assertEquals(exp3,
worker3.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
- assertEquals(exp4,
worker4.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
+ assertEquals(exp1,
worker1.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
+ assertEquals(exp2,
worker2.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
+ assertEquals(exp3,
worker3.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
+ assertEquals(exp4,
worker4.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
+ } finally {
+ if (null != rpcEnv) {
+ rpcEnv.shutdown()
+ }
+ }
}
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 3cfb34f3c..3f2d19b90 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -106,7 +106,18 @@ abstract class HttpService extends Service with Logging {
}
override def close(): Unit = {
- httpServer.stop()
+ // may be null when running the unit test
+ if (null != httpServer) {
+ httpServer.stop(true)
+ }
super.close()
}
+
+ override def shutdown(graceful: Boolean): Unit = {
+ // may be null when running the unit test
+ if (null != httpServer) {
+ httpServer.stop(graceful)
+ }
+ super.shutdown(graceful)
+ }
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
index 592af3c02..129f28f4d 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala
@@ -36,6 +36,8 @@ abstract class Service extends Logging {
}
def close(): Unit = {}
+
+ def shutdown(graceful: Boolean): Unit = {}
}
object Service {
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
index 115843c36..8f4f43891 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala
@@ -56,7 +56,7 @@ class HttpServer(
isStarted = true
}
- def stop(): Unit = synchronized {
+ def stop(graceful: Boolean): Unit = synchronized {
if (isStarted) {
logInfo(s"$role: Stopping HttpServer")
if (bindFuture != null) {
@@ -66,12 +66,20 @@ class HttpServer(
}
if (bootstrap != null && bootstrap.config.group != null) {
Utils.tryLogNonFatalError {
- bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
+ if (graceful) {
+ bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
+ } else {
+ bootstrap.config.group.shutdownGracefully(0, 0, TimeUnit.SECONDS)
+ }
}
}
if (bootstrap != null && bootstrap.config.childGroup != null) {
Utils.tryLogNonFatalError {
- bootstrap.config.childGroup.shutdownGracefully(3, 5,
TimeUnit.SECONDS)
+ if (graceful) {
+ bootstrap.config.childGroup.shutdownGracefully(3, 5,
TimeUnit.SECONDS)
+ } else {
+ bootstrap.config.childGroup.shutdownGracefully(0, 0,
TimeUnit.SECONDS)
+ }
}
}
bootstrap = null
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 572d03247..0ad8b5d0b 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -393,37 +393,62 @@ private[celeborn] class Worker(
}
override def close(): Unit = synchronized {
+ shutdown(gracefulShutdown)
+ }
+
+ override def shutdown(graceful: Boolean): Unit = {
if (!stopped) {
logInfo("Stopping Worker.")
if (sendHeartbeatTask != null) {
- sendHeartbeatTask.cancel(true)
+ if (graceful) {
+ sendHeartbeatTask.cancel(false)
+ } else {
+ sendHeartbeatTask.cancel(true)
+ }
sendHeartbeatTask = null
}
if (checkFastFailTask != null) {
- checkFastFailTask.cancel(true)
+ if (graceful) {
+ checkFastFailTask.cancel(false)
+ } else {
+ checkFastFailTask.cancel(true)
+ }
checkFastFailTask = null
}
- forwardMessageScheduler.shutdownNow()
- replicateThreadPool.shutdownNow()
- commitThreadPool.shutdownNow()
- asyncReplyPool.shutdownNow()
- partitionsSorter.close()
+ if (graceful) {
+ forwardMessageScheduler.shutdown()
+ replicateThreadPool.shutdown()
+ commitThreadPool.shutdown()
+ asyncReplyPool.shutdown()
+ partitionsSorter.close()
+ } else {
+ forwardMessageScheduler.shutdownNow()
+ replicateThreadPool.shutdownNow()
+ commitThreadPool.shutdownNow()
+ asyncReplyPool.shutdownNow()
+ partitionsSorter.close()
+ }
if (null != storageManager) {
storageManager.close()
}
- memoryManager.close();
+ memoryManager.close()
masterClient.close()
- replicateServer.close()
- fetchServer.close()
+ replicateServer.shutdown(graceful)
+ fetchServer.shutdown(graceful)
+ // TODO: `pushServer` never be closed before this PR.
+ pushServer.shutdown(graceful)
- super.close()
+ super.shutdown(graceful)
logInfo("Worker is stopped.")
stopped = true
}
+ if (!graceful) {
+ shutdown.set(true)
+ }
}
private def registerWithMaster(): Unit = {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index 6914d9215..a61854057 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -130,7 +130,8 @@ trait MiniClusterFeature extends Logging {
// interrupt threads
Thread.sleep(5000)
workerInfos.foreach {
- case (_, thread) =>
+ case (worker, thread) =>
+ worker.shutdown(graceful = false)
thread.interrupt()
}
workerInfos.clear()
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index da78af885..01c823c0c 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import org.junit.Assert
import org.mockito.MockitoSugar._
+import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
@@ -33,12 +34,26 @@ import
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
import org.apache.celeborn.common.util.JavaUtils
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
-class WorkerSuite extends AnyFunSuite {
- val conf = new CelebornConf()
- val workerArgs = new WorkerArguments(Array(), conf)
+class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
+ private var worker: Worker = _
+ private val conf = new CelebornConf()
+ private val workerArgs = new WorkerArguments(Array(), conf)
+
+ override def beforeEach(): Unit = {
+ assert(null == worker)
+ }
+
+ override def afterEach(): Unit = {
+ if (null != worker) {
+ worker.rpcEnv.shutdown()
+ worker.shutdown(false)
+ worker = null
+ }
+ }
+
test("clean up") {
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
- val worker = new Worker(conf, workerArgs)
+ worker = new Worker(conf, workerArgs)
val pl1 = new PartitionLocation(0, 0, "12", 0, 0, 0, 0,
PartitionLocation.Mode.PRIMARY)
val pl2 = new PartitionLocation(1, 0, "12", 0, 0, 0, 0,
PartitionLocation.Mode.REPLICA)
@@ -74,7 +89,7 @@ class WorkerSuite extends AnyFunSuite {
test("flush filewriters") {
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
- val worker = new Worker(conf, workerArgs)
+ worker = new Worker(conf, workerArgs)
val dir = new File("/tmp")
val allWriters = new util.HashSet[FileWriter]()
val map = JavaUtils.newConcurrentHashMap[String, FileWriter]()