This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 5adef544c [CELEBORN-1094] Optimize mechanism of ChunkManager expired
shuffle key cleanup to avoid memory leak
5adef544c is described below
commit 5adef544cd16836f2bf4cb5b357b9c18c36c5802
Author: SteNicholas <[email protected]>
AuthorDate: Thu Nov 2 15:46:07 2023 +0800
[CELEBORN-1094] Optimize mechanism of ChunkManager expired shuffle key
cleanup to avoid memory leak
### What changes were proposed in this pull request?
The `cleaner` of `Worker` executes the
`StorageManager#cleanupExpiredShuffleKey` to clean expired shuffle keys with
daemon cached thread pool. The optimization speeds up cleaning including
expired shuffle keys of ChunkManager to avoid memory leak.
### Why are the changes needed?
`ChunkManager#streams` could lead memory leak when the speed of cleanup is
slower than expiration for expired shuffle of worker. The behavior that
`ChunkStreamManager` cleanup expired shuffle key should be optimized to avoid
memory leak, which causes that the VM thread of worker is 100%.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`WorkerSuite#clean up`.
Closes #2053 from SteNicholas/CELEBORN-1094.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 4e8e8c2310225b4dd59c80f699073e108fd4abf9)
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 18 ++++++
docs/configuration/worker.md | 2 +
.../deploy/worker/storage/ChunkStreamManager.java | 29 +++------
.../celeborn/service/deploy/worker/Worker.scala | 37 +++++++-----
.../deploy/worker/storage/StorageManager.scala | 69 ++++++++++------------
.../deploy/worker/storage/WorkerSuite.scala | 9 ++-
6 files changed, 90 insertions(+), 74 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 73111c969..f5b1691ce 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -650,6 +650,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS)
def workerCommitThreads: Int =
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else
get(WORKER_COMMIT_THREADS)
+ def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
def partitionSorterEagerlyRemoveOriginalFilesEnabled: Boolean =
@@ -967,6 +968,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerDiskTimeSlidingWindowMinFetchCount: Int =
get(WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT)
def workerDiskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE)
+ def workerDiskCleanThreads: Int = get(WORKER_DISK_CLEAN_THREADS)
def workerDiskMonitorEnabled: Boolean = get(WORKER_DISK_MONITOR_ENABLED)
def workerDiskMonitorCheckList: Seq[String] =
get(WORKER_DISK_MONITOR_CHECKLIST)
def workerDiskMonitorCheckInterval: Long =
get(WORKER_DISK_MONITOR_CHECK_INTERVAL)
@@ -2098,6 +2100,14 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("5G")
+ val WORKER_DISK_CLEAN_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.worker.disk.clean.threads")
+ .categories("worker")
+ .version("0.3.2")
+ .doc("Thread number of worker to clean up directories of expired shuffle
keys on disk.")
+ .intConf
+ .createWithDefault(4)
+
val WORKER_CHECK_FILE_CLEAN_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.worker.storage.checkDirsEmpty.maxRetries")
.withAlternative("celeborn.worker.disk.checkFileClean.maxRetries")
@@ -2229,6 +2239,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(32)
+ val WORKER_CLEAN_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.worker.clean.threads")
+ .categories("worker")
+ .version("0.3.2")
+ .doc("Thread number of worker to clean up expired shuffle keys.")
+ .intConf
+ .createWithDefault(64)
+
val WORKER_SHUFFLE_COMMIT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.commitFiles.timeout")
.withAlternative("celeborn.worker.shuffle.commit.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 60cc9fc68..194d13d00 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -26,6 +26,7 @@ license: |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 |
| celeborn.worker.activeConnection.max | <undefined> | If the number of
active connections on a worker exceeds this configuration value, the worker
will be marked as high-load in the heartbeat report, and the master will not
include that node in the response of RequestSlots. | 0.3.1 |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for
read buffer per mount point. | 0.3.0 |
+| celeborn.worker.clean.threads | 64 | Thread number of worker to clean up
expired shuffle keys. | 0.3.2 |
| celeborn.worker.closeIdleConnections | false | Whether worker will close
idle connections. | 0.2.0 |
| celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit
shuffle data files asynchronously. It's recommended to set at least `128` when
`HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
| celeborn.worker.commitFiles.timeout | 120s | Timeout for a Celeborn worker
to commit files of a shuffle. It's recommended to set at least `240s` when
`HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
@@ -42,6 +43,7 @@ license: |
| celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory
usage reaches this limit, the worker will stop to receive data from Celeborn
shuffle clients. | 0.2.0 |
| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory
usage reaches this limit, the worker will stop to receive replication data from
other workers. This value should be higher than
celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 |
| celeborn.worker.directMemoryRatioToResume | 0.7 | If direct memory usage is
less than this limit, worker will resume. | 0.2.0 |
+| celeborn.worker.disk.clean.threads | 4 | Thread number of worker to clean up
directories of expired shuffle keys on disk. | 0.3.2 |
| celeborn.worker.fetch.heartbeat.enabled | false | enable the heartbeat from
worker to client when fetching data | 0.3.0 |
| celeborn.worker.fetch.io.threads | <undefined> | Netty IO thread
number of worker to handle client fetch data. The default threads number is the
number of flush thread. | 0.2.0 |
| celeborn.worker.fetch.port | 0 | Server port for Worker to receive fetch
data request from ShuffleClient. | 0.2.0 |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
index 053e09ca7..98fd7d658 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
@@ -24,8 +24,6 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,9 +81,7 @@ public class ChunkStreamManager {
}
FileManagedBuffers buffers = state.buffers;
- ManagedBuffer nextChunk = buffers.chunk(chunkIndex, offset, len);
-
- return nextChunk;
+ return buffers.chunk(chunkIndex, offset, len);
}
public TimeWindow getFetchTimeMetric(long streamId) {
@@ -97,20 +93,6 @@ public class ChunkStreamManager {
}
}
- public static String genStreamChunkId(long streamId, int chunkId) {
- return String.format("%d_%d", streamId, chunkId);
- }
-
- // Parse streamChunkId to be stream id and chunk id. This is used when fetch
remote chunk as a
- // stream.
- public static Pair<Long, Integer> parseStreamChunkId(String streamChunkId) {
- String[] array = streamChunkId.split("_");
- assert array.length == 2 : "Stream id and chunk index should be
specified.";
- long streamId = Long.parseLong(array[0]);
- int chunkIndex = Integer.parseInt(array[1]);
- return ImmutablePair.of(streamId, chunkIndex);
- }
-
public void chunkBeingSent(long streamId) {
StreamState streamState = streams.get(streamId);
if (streamState != null) {
@@ -163,14 +145,21 @@ public class ChunkStreamManager {
}
public void cleanupExpiredShuffleKey(Set<String> expiredShuffleKeys) {
+ logger.info(
+ "Clean up expired shuffle keys {}",
+ String.join(",", expiredShuffleKeys.toArray(new String[0])));
for (String expiredShuffleKey : expiredShuffleKeys) {
Set<Long> expiredStreamIds = shuffleStreamIds.remove(expiredShuffleKey);
// normally expiredStreamIds set will be empty as streamId will be
removed when be fully read
if (expiredStreamIds != null && !expiredStreamIds.isEmpty()) {
- streams.keySet().removeAll(expiredStreamIds);
+ expiredStreamIds.forEach(streams::remove);
}
}
+ logger.info(
+ "Cleaned up expired shuffle keys. The count of shuffle keys and
streams: {}, {}",
+ shuffleStreamIds.size(),
+ streams.size());
}
@VisibleForTesting
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index e85da40ec..c8f0f2b98 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -251,7 +251,11 @@ private[celeborn] class Worker(
val replicateThreadPool: ThreadPoolExecutor =
ThreadUtils.newDaemonCachedThreadPool("worker-replicate-data",
conf.workerReplicateThreads)
val commitThreadPool: ThreadPoolExecutor =
- ThreadUtils.newDaemonCachedThreadPool("Worker-CommitFiles",
conf.workerCommitThreads)
+ ThreadUtils.newDaemonCachedThreadPool("worker-commit-files",
conf.workerCommitThreads)
+ val cleanThreadPool: ThreadPoolExecutor =
+ ThreadUtils.newDaemonCachedThreadPool(
+ "worker-clean-expired-shuffle-keys",
+ conf.workerCleanThreads)
val asyncReplyPool: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("async-reply")
val timer = new HashedWheelTimer()
@@ -400,7 +404,7 @@ private[celeborn] class Worker(
while (true) {
val expiredShuffleKeys = cleanTaskQueue.take()
try {
- cleanup(expiredShuffleKeys)
+ cleanup(expiredShuffleKeys, cleanThreadPool)
} catch {
case e: Throwable =>
logError("Cleanup failed", e)
@@ -512,20 +516,23 @@ private[celeborn] class Worker(
throw new CelebornException("Register worker failed.", exception)
}
@VisibleForTesting
- def cleanup(expiredShuffleKeys: JHashSet[String]): Unit = synchronized {
- expiredShuffleKeys.asScala.foreach { shuffleKey =>
- partitionLocationInfo.removeShuffle(shuffleKey)
- shufflePartitionType.remove(shuffleKey)
- shufflePushDataTimeout.remove(shuffleKey)
- shuffleMapperAttempts.remove(shuffleKey)
- shuffleCommitInfos.remove(shuffleKey)
- workerInfo.releaseSlots(shuffleKey)
- logInfo(s"Cleaned up expired shuffle $shuffleKey")
+ def cleanup(expiredShuffleKeys: JHashSet[String], threadPool:
ThreadPoolExecutor): Unit =
+ synchronized {
+ expiredShuffleKeys.asScala.foreach { shuffleKey =>
+ partitionLocationInfo.removeShuffle(shuffleKey)
+ shufflePartitionType.remove(shuffleKey)
+ shufflePushDataTimeout.remove(shuffleKey)
+ shuffleMapperAttempts.remove(shuffleKey)
+ shuffleCommitInfos.remove(shuffleKey)
+ workerInfo.releaseSlots(shuffleKey)
+ logInfo(s"Cleaned up expired shuffle $shuffleKey")
+ }
+ partitionsSorter.cleanup(expiredShuffleKeys)
+ fetchHandler.cleanupExpiredShuffleKey(expiredShuffleKeys)
+ threadPool.execute(new Runnable {
+ override def run(): Unit =
storageManager.cleanupExpiredShuffleKey(expiredShuffleKeys)
+ })
}
- partitionsSorter.cleanup(expiredShuffleKeys)
- storageManager.cleanupExpiredShuffleKey(expiredShuffleKeys)
- fetchHandler.cleanupExpiredShuffleKey(expiredShuffleKeys)
- }
override def getWorkerInfo: String = {
val sb = new StringBuilder
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 8da9db9b6..27c225378 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -29,6 +29,7 @@ import scala.collection.JavaConverters._
import scala.concurrent.duration._
import io.netty.buffer.PooledByteBufAllocator
+import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.iq80.leveldb.{DB, WriteOptions}
@@ -88,7 +89,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
diskInfo =>
cleaners.put(
diskInfo.mountPoint,
-
ThreadUtils.newDaemonCachedThreadPool(s"Disk-cleaner-${diskInfo.mountPoint}",
1))
+ ThreadUtils.newDaemonCachedThreadPool(
+ s"disk-cleaner-${diskInfo.mountPoint}",
+ conf.workerDiskCleanThreads))
}
cleaners
}
@@ -156,7 +159,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit =
this.synchronized {
if (diskStatus == DiskStatus.CRITICAL_ERROR) {
- logInfo(s"Disk ${mountPoint} faces critical error, will remove its disk
operator.")
+ logInfo(s"Disk $mountPoint faces critical error, will remove its disk
operator.")
val operator = diskOperators.remove(mountPoint)
if (operator != null) {
operator.shutdown()
@@ -168,7 +171,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
if (!diskOperators.containsKey(mountPoint)) {
diskOperators.put(
mountPoint,
- ThreadUtils.newDaemonCachedThreadPool(s"Disk-cleaner-${mountPoint}",
1))
+ ThreadUtils.newDaemonCachedThreadPool(
+ s"disk-cleaner-$mountPoint",
+ conf.workerDiskCleanThreads))
}
}
@@ -176,7 +181,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
private val counterOperator = new IntUnaryOperator() {
override def applyAsInt(operand: Int): Int = {
val dirs = healthyWorkingDirs()
- if (dirs.length > 0) {
+ if (dirs.nonEmpty) {
(operand + 1) % dirs.length
} else 0
}
@@ -251,12 +256,12 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val shuffleKey = parseDbShuffleKey(key)
try {
val files = PbSerDeUtils.fromPbFileInfoMap(entry.getValue, cache)
- logDebug(s"Reload DB: ${shuffleKey} -> ${files}")
+ logDebug(s"Reload DB: $shuffleKey -> $files")
fileInfos.put(shuffleKey, files)
db.delete(entry.getKey)
} catch {
case exception: Exception =>
- logError(s"Reload DB: ${shuffleKey} failed.", exception)
+ logError(s"Reload DB: $shuffleKey failed.", exception)
}
} else {
return
@@ -520,7 +525,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val hdfsFileWriter = hdfsWriters.get(fileInfo.getFilePath)
if (hdfsFileWriter != null) {
hdfsFileWriter.destroy(new IOException(
- s"Destroy FileWriter ${hdfsFileWriter} caused by shuffle
${shuffleKey} expired."))
+ s"Destroy FileWriter $hdfsFileWriter caused by shuffle $shuffleKey
expired."))
hdfsWriters.remove(fileInfo.getFilePath)
}
} else {
@@ -531,7 +536,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val fileWriter = writers.get(fileInfo.getFilePath)
if (fileWriter != null) {
fileWriter.destroy(new IOException(
- s"Destroy FileWriter ${fileWriter} caused by shuffle ${shuffleKey}
expired."))
+ s"Destroy FileWriter $fileWriter caused by shuffle $shuffleKey
expired."))
writers.remove(fileInfo.getFilePath)
}
}
@@ -608,9 +613,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
.filter(diskInfo =>
diskInfo.status == DiskStatus.HEALTHY
|| diskInfo.status == DiskStatus.HIGH_DISK_USAGE)
- .map { case diskInfo =>
- (diskInfo, diskInfo.dirs.filter(_.exists).flatMap(_.listFiles()))
- }
+ .map(diskInfo =>
+ (diskInfo, diskInfo.dirs.filter(_.exists).flatMap(_.listFiles())))
val appIds = shuffleKeySet().asScala.map(key =>
Utils.splitShuffleKey(key)._1)
diskInfoAndAppDirs.foreach { case (diskInfo, appDirs) =>
@@ -626,34 +630,25 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
private def deleteDirectory(dir: File, threadPool: ThreadPoolExecutor): Unit
= {
- val allContents = dir.listFiles
- if (allContents != null) {
- for (file <- allContents) {
- deleteDirectory(file, threadPool)
- }
+ if (dir.exists()) {
+ threadPool.submit(new Runnable {
+ override def run(): Unit = {
+ deleteDirectoryWithRetry(dir)
+ }
+ })
}
- threadPool.submit(new Runnable {
- override def run(): Unit = {
- deleteFileWithRetry(dir)
- }
- })
}
- private def deleteFileWithRetry(file: File): Unit = {
- if (file.exists()) {
- var retryCount = 0
- var deleteSuccess = false
- while (!deleteSuccess && retryCount <= 3) {
- deleteSuccess = file.delete()
- retryCount = retryCount + 1
- if (!deleteSuccess) {
- Thread.sleep(200 * retryCount)
- }
- }
- if (deleteSuccess) {
- logDebug(s"Deleted expired shuffle file $file.")
- } else {
- logWarning(s"Failed to delete expired shuffle file $file.")
+ private def deleteDirectoryWithRetry(dir: File): Unit = {
+ var retryCount = 0
+ var deleteSuccess = false
+ while (!deleteSuccess && retryCount <= 3) {
+ try {
+ FileUtils.deleteDirectory(dir)
+ deleteSuccess = true
+ } catch {
+ case _: IOException =>
+ retryCount = retryCount + 1
}
}
}
@@ -693,7 +688,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
retryTimes += 1
if (retryTimes < conf.workerCheckFileCleanMaxRetries) {
logInfo(s"Working directory's files have not been cleaned up
completely, " +
- s"will start ${retryTimes + 1}th attempt after
${workerCheckFileCleanTimeout} milliseconds.")
+ s"will start ${retryTimes + 1}th attempt after
$workerCheckFileCleanTimeout milliseconds.")
}
Thread.sleep(workerCheckFileCleanTimeout)
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index 1fde28f0b..d5f75380b 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType}
-import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils}
+import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils,
ThreadUtils}
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
@@ -83,7 +83,12 @@ class WorkerSuite extends AnyFunSuite with
BeforeAndAfterEach {
val shuffleKey2 = "2-2"
expiredShuffleKeys.add(shuffleKey1)
expiredShuffleKeys.add(shuffleKey2)
- worker.cleanup(expiredShuffleKeys)
+ worker.cleanup(
+ expiredShuffleKeys,
+ ThreadUtils.newDaemonCachedThreadPool(
+ "worker-clean-expired-shuffle-keys",
+ conf.workerCleanThreads))
+ Thread.sleep(3000)
worker.storageManager.workingDirWriters.values().asScala.map(t =>
assert(t.size() == 0))
}