This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7fc4f241f [CELEBORN-2146] Setting the DFS replication factor for
balanced fault tolerance and storage efficiency
7fc4f241f is described below
commit 7fc4f241f792774190c7724ae716345603c65d35
Author: xxx <[email protected]>
AuthorDate: Sat Sep 20 15:20:50 2025 +0800
[CELEBORN-2146] Setting the DFS replication factor for balanced fault
tolerance and storage efficiency
…erance and storage efficiency
### What changes were proposed in this pull request?
Set the DFS replication factor for balanced fault tolerance and storage
efficiency.
### Why are the changes needed?
Setting replication factor is to determine the number of redundant copies
of data that should be maintained across multiple nodes, ensuring fault
tolerance and data availability in case of node failures.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes #3472 from xy2953396112/CELEBORN-2146.
Authored-by: xxx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++++
docs/configuration/worker.md | 1 +
.../celeborn/service/deploy/worker/storage/TierWriter.scala | 4 ++++
3 files changed, 16 insertions(+)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 19cc15982..9b2d044aa 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -690,6 +690,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(MASTER_RESOURCE_CONSUMPTION_METRICS_ENABLED)
def workerFlushReuseCopyBufferEnabled: Boolean =
get(WORKER_FLUSH_REUSE_COPY_BUFFER_ENABLED)
+ def workerDfsReplicationFactor: Int =
+ get(WORKER_DFS_REPLICATION_FACTOR)
+
def clusterName: String = get(CLUSTER_NAME)
// //////////////////////////////////////////////////////
@@ -6687,4 +6690,12 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefaultString("true")
+ val WORKER_DFS_REPLICATION_FACTOR: ConfigEntry[Int] =
+ buildConf("celeborn.worker.hdfs.replication.factor")
+ .categories("worker")
+ .version("0.7.0")
+ .doc("HDFS replication factor for shuffle files.")
+ .intConf
+ .createWithDefault(2)
+
}
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 72343249e..4c688fff3 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -106,6 +106,7 @@ license: |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s |
false | Interval for a Celeborn worker to flush committed file infos into Level
DB. | 0.3.1 | |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false
| Whether to call sync method to save committed file infos into Level DB to
handle OS crash. | 0.3.1 | |
| celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's
graceful shutdown timeout time. | 0.2.0 | |
+| celeborn.worker.hdfs.replication.factor | 2 | false | HDFS replication
factor for shuffle files. | 0.7.0 | |
| celeborn.worker.http.auth.administers | | false | A comma-separated list of
users who have admin privileges, Note, when
celeborn.worker.http.auth.supportedSchemes is not set, everyone is treated as
administrator. | 0.6.0 | |
| celeborn.worker.http.auth.basic.provider |
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl |
false | User-defined password authentication implementation of
org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0
| |
| celeborn.worker.http.auth.bearer.provider |
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl |
false | User-defined token authentication implementation of
org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 |
|
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index f4b5e19f9..93a6c2989 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -536,6 +536,7 @@ class DfsTierWriter(
try {
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
+ hadoopFs.setReplication(dfsFileInfo.getDfsPath,
conf.workerDfsReplicationFactor.toShort);
if (dfsFileInfo.isS3) {
val uri = hadoopFs.getUri
val bucketName = uri.getHost
@@ -654,6 +655,9 @@ class DfsTierWriter(
hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close()
if (dfsFileInfo.isReduceFileMeta) {
val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath)
+ hadoopFs.setReplication(
+ dfsFileInfo.getDfsIndexPath,
+ conf.workerDfsReplicationFactor.toShort)
val byteStream: ByteArrayOutputStream = new ByteArrayOutputStream()
val dataStream = new DataOutputStream(byteStream)
try {