This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a596bbed [CELEBORN-1469] Support writing shuffle data to OSS(S3 only)
7a596bbed is described below

commit 7a596bbed1f94ef0ad983b51b9f4ebe41053075e
Author: zhaohehuhu <[email protected]>
AuthorDate: Wed Jul 24 11:59:15 2024 +0800

    [CELEBORN-1469] Support writing shuffle data to OSS(S3 only)
    
    ### What changes were proposed in this pull request?
    
    as title
    
    ### Why are the changes needed?
    
    Now, Celeborn doesn't support sinking shuffle data directly to Amazon S3, 
which could be a limitation when we're trying to move on-premises servers to 
AWS and use S3 as a data sink for shuffled data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Closes #2579 from zhaohehuhu/dev-0619.
    
    Authored-by: zhaohehuhu <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 LICENSE-binary                                     |   2 +
 NOTICE-binary                                      |   4 +
 build/make-distribution.sh                         |  10 +-
 .../org/apache/celeborn/client/ShuffleClient.java  |  18 +--
 .../celeborn/client/read/DfsPartitionReader.java   |  54 +++++----
 common/pom.xml                                     |  20 ++++
 .../apache/celeborn/common/meta/DiskFileInfo.java  |  40 ++++---
 .../celeborn/common/protocol/StorageInfo.java      |  21 +++-
 common/src/main/proto/TransportMessages.proto      |   2 +-
 .../org/apache/celeborn/common/CelebornConf.scala  |  82 ++++++++++++-
 .../apache/celeborn/common/meta/WorkerInfo.scala   |  62 +++++-----
 .../common/protocol/message/ControlMessages.scala  |  10 +-
 .../celeborn/common/util/CelebornHadoopUtils.scala |  36 +++++-
 .../org/apache/celeborn/common/util/Utils.scala    |   7 +-
 dev/deps/dependencies-server                       |   2 +
 docs/configuration/client.md                       |   4 +
 docs/configuration/master.md                       |   5 +
 docs/configuration/worker.md                       |   6 +
 master/pom.xml                                     |  20 ++++
 .../service/deploy/master/SlotsAllocator.java      |  20 +++-
 .../master/clustermeta/AbstractMetaManager.java    |   8 +-
 .../celeborn/service/deploy/master/Master.scala    |  49 +++++---
 .../src/main/openapi3/worker_rest_v1.yaml          |   1 +
 pom.xml                                            |  20 +++-
 project/CelebornBuild.scala                        |  12 +-
 .../server/common/http/api/v1/ApiUtils.scala       |   2 +
 .../deploy/worker/memory/MemoryManager.java        |   2 +-
 .../worker/storage/MapPartitionDataWriter.java     |  45 +++----
 .../deploy/worker/storage/PartitionDataWriter.java |  45 ++++---
 .../worker/storage/PartitionFilesSorter.java       | 102 +++++++++-------
 .../worker/storage/ReducePartitionDataWriter.java  |  13 +-
 .../service/deploy/worker/Controller.scala         |   2 +-
 .../service/deploy/worker/FetchHandler.scala       |   6 +
 .../service/deploy/worker/PushDataHandler.scala    |   5 +-
 .../celeborn/service/deploy/worker/Worker.scala    |   2 +-
 .../service/deploy/worker/storage/FlushTask.scala  |  35 +++++-
 .../service/deploy/worker/storage/Flusher.scala    |  19 +++
 .../deploy/worker/storage/StorageManager.scala     | 131 ++++++++++++++++-----
 .../deploy/worker/storage/StoragePolicy.scala      |   2 +-
 .../MemoryReducePartitionDataWriterSuiteJ.java     |   2 +-
 40 files changed, 685 insertions(+), 243 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 221f632dd..eabd3db4a 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -263,6 +263,7 @@ org.apache.commons:commons-crypto
 org.apache.commons:commons-lang3
 org.apache.hadoop:hadoop-client-api
 org.apache.hadoop:hadoop-client-runtime
+org.apache.hadoop:hadoop-aws
 org.apache.ibatis:mybatis
 org.apache.logging.log4j:log4j-1.2-api
 org.apache.logging.log4j:log4j-api
@@ -307,6 +308,7 @@ org.slf4j:jcl-over-slf4j
 org.webjars:swagger-ui
 org.xerial.snappy:snappy-java
 org.yaml:snakeyaml
+com.amazonaws:aws-java-sdk-bundle
 
 
------------------------------------------------------------------------------------
 This product bundles various third-party components under other open source 
licenses.
diff --git a/NOTICE-binary b/NOTICE-binary
index e4f0de0af..9942e1440 100644
--- a/NOTICE-binary
+++ b/NOTICE-binary
@@ -202,3 +202,7 @@ Copyright (c) 2022 Luke Hutchison
 
 mimepool
 Copyright (c) 2018, 2022 Oracle and/or its affiliates.
+
+
+aws-java-sdk
+Copyright 2010-2024 Amazon.com, Inc. or its affiliates.
\ No newline at end of file
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index e5e39ec6e..1f093bd12 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -27,6 +27,7 @@ RELEASE="false"
 MVN="$PROJECT_DIR/build/mvn"
 SBT="$PROJECT_DIR/build/sbt"
 SBT_ENABLED="false"
+HADOOP_AWS_ENABLED="false"
 
 function exit_with_usage {
   echo "make-distribution.sh - tool for making binary distributions of 
Celeborn"
@@ -62,6 +63,11 @@ while (( "$#" )); do
       echo "Error: $1 is not supported"
       exit_with_usage
       ;;
+    -P*)
+      if [[ "$1" == *"hadoop-aws"* ]]; then
+        HADOOP_AWS_ENABLED="true"
+      fi
+      ;;
     -*)
       break
       ;;
@@ -256,7 +262,9 @@ function sbt_build_service {
 
   echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE"
   echo "Build flags: $@" >> "$DIST_DIR/RELEASE"
-
+  if [[ "$HADOOP_AWS_ENABLED" == "true" ]]; then
+    export SBT_MAVEN_PROFILES="hadoop-aws"
+  fi
   BUILD_COMMAND=("$SBT" clean package)
 
   # Actually build the jar
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index 0ea484deb..0738368ff 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -19,6 +19,7 @@ package org.apache.celeborn.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.LongAdder;
 
@@ -34,6 +35,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.network.client.TransportClientFactory;
 import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.common.protocol.PbStreamHandler;
+import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.common.rpc.RpcEndpointRef;
 import org.apache.celeborn.common.util.CelebornHadoopUtils;
 import org.apache.celeborn.common.util.ExceptionMaker;
@@ -47,7 +49,7 @@ public abstract class ShuffleClient {
   private static Logger logger = LoggerFactory.getLogger(ShuffleClient.class);
   private static volatile ShuffleClient _instance;
   private static volatile boolean initialized = false;
-  private static volatile FileSystem hdfsFs;
+  private static volatile Map<StorageInfo.Type, FileSystem> hadoopFs;
   private static LongAdder totalReadCounter = new LongAdder();
   private static LongAdder localShuffleReadCounter = new LongAdder();
 
@@ -55,7 +57,7 @@ public abstract class ShuffleClient {
   public static void reset() {
     _instance = null;
     initialized = false;
-    hdfsFs = null;
+    hadoopFs = null;
   }
 
   protected ShuffleClient() {}
@@ -101,19 +103,19 @@ public abstract class ShuffleClient {
     return _instance;
   }
 
-  public static FileSystem getHdfsFs(CelebornConf conf) {
-    if (null == hdfsFs) {
+  public static Map<StorageInfo.Type, FileSystem> getHadoopFs(CelebornConf 
conf) {
+    if (null == hadoopFs) {
       synchronized (ShuffleClient.class) {
-        if (null == hdfsFs) {
+        if (null == hadoopFs) {
           try {
-            hdfsFs = CelebornHadoopUtils.getHadoopFS(conf);
+            hadoopFs = CelebornHadoopUtils.getHadoopFS(conf);
           } catch (Exception e) {
-            logger.error("Celeborn initialize HDFS failed.", e);
+            logger.error("Celeborn initialize DFS failed.", e);
           }
         }
       }
     }
-    return hdfsFs;
+    return hadoopFs;
   }
 
   public static void incrementLocalReadCounter() {
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 712bd82e4..b69cf580f 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -30,6 +30,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.ReferenceCounted;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,7 @@ import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.common.protocol.PbBufferStreamEnd;
 import org.apache.celeborn.common.protocol.PbOpenStream;
 import org.apache.celeborn.common.protocol.PbStreamHandler;
+import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.common.protocol.StreamType;
 import org.apache.celeborn.common.util.ShuffleBlockInfoUtils;
 import org.apache.celeborn.common.util.ThreadUtils;
@@ -60,7 +62,7 @@ public class DfsPartitionReader implements PartitionReader {
   private volatile boolean closed = false;
   private ExecutorService fetchThread;
   private boolean fetchThreadStarted;
-  private FSDataInputStream hdfsInputStream;
+  private FSDataInputStream dfsInputStream;
   private int numChunks = 0;
   private int returnedChunks = 0;
   private int currentChunkIndex = 0;
@@ -68,6 +70,7 @@ public class DfsPartitionReader implements PartitionReader {
   private TransportClient client;
   private PbStreamHandler streamHandler;
   private MetricsCallback metricsCallback;
+  private FileSystem hadoopFs;
 
   public DfsPartitionReader(
       CelebornConf conf,
@@ -85,6 +88,12 @@ public class DfsPartitionReader implements PartitionReader {
 
     this.metricsCallback = metricsCallback;
     this.location = location;
+    if (location.getStorageInfo() != null
+        && location.getStorageInfo().getType() == StorageInfo.Type.S3) {
+      this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
+    } else {
+      this.hadoopFs = 
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
+    }
 
     if (endMapIndex != Integer.MAX_VALUE) {
       long fetchTimeoutMs = conf.clientFetchTimeoutMs();
@@ -105,18 +114,17 @@ public class DfsPartitionReader implements 
PartitionReader {
         // Parse this message to ensure sort is done.
       } catch (IOException | InterruptedException e) {
         throw new IOException(
-            "read shuffle file from HDFS failed, filePath: "
+            "read shuffle file from DFS failed, filePath: "
                 + location.getStorageInfo().getFilePath(),
             e);
       }
-      hdfsInputStream =
-          ShuffleClient.getHdfsFs(conf)
-              .open(new 
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
+
+      dfsInputStream =
+          hadoopFs.open(new 
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
       chunkOffsets.addAll(
           getChunkOffsetsFromSortedIndex(conf, location, startMapIndex, 
endMapIndex));
     } else {
-      hdfsInputStream =
-          ShuffleClient.getHdfsFs(conf).open(new 
Path(location.getStorageInfo().getFilePath()));
+      dfsInputStream = hadoopFs.open(new 
Path(location.getStorageInfo().getFilePath()));
       chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
     }
     logger.debug(
@@ -138,8 +146,7 @@ public class DfsPartitionReader implements PartitionReader {
       throws IOException {
     List<Long> offsets;
     try (FSDataInputStream indexInputStream =
-        ShuffleClient.getHdfsFs(conf)
-            .open(new 
Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
+        hadoopFs.open(new 
Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
       offsets = new ArrayList<>();
       int offsetCount = indexInputStream.readInt();
       for (int i = 0; i < offsetCount; i++) {
@@ -154,10 +161,9 @@ public class DfsPartitionReader implements PartitionReader 
{
       throws IOException {
     String indexPath = 
Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
     List<Long> offsets;
-    try (FSDataInputStream indexInputStream =
-        ShuffleClient.getHdfsFs(conf).open(new Path(indexPath))) {
+    try (FSDataInputStream indexInputStream = hadoopFs.open(new 
Path(indexPath))) {
       logger.debug("read sorted index {}", indexPath);
-      long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new 
Path(indexPath)).getLen();
+      long indexSize = hadoopFs.getFileStatus(new Path(indexPath)).getLen();
       // Index size won't be large, so it's safe to do the conversion.
       byte[] indexBuffer = new byte[(int) indexSize];
       indexInputStream.readFully(0L, indexBuffer);
@@ -196,24 +202,22 @@ public class DfsPartitionReader implements 
PartitionReader {
                 logger.debug("read {} offset {} length {}", currentChunkIndex, 
offset, length);
                 byte[] buffer = new byte[(int) length];
                 try {
-                  hdfsInputStream.readFully(offset, buffer);
+                  dfsInputStream.readFully(offset, buffer);
                 } catch (IOException e) {
                   logger.warn(
-                      "read HDFS {} failed will retry, error detail {}",
+                      "read DFS {} failed will retry, error detail {}",
                       location.getStorageInfo().getFilePath(),
                       e);
                   try {
-                    hdfsInputStream.close();
-                    hdfsInputStream =
-                        ShuffleClient.getHdfsFs(conf)
-                            .open(
-                                new Path(
-                                    Utils.getSortedFilePath(
-                                        
location.getStorageInfo().getFilePath())));
-                    hdfsInputStream.readFully(offset, buffer);
+                    dfsInputStream.close();
+                    dfsInputStream =
+                        hadoopFs.open(
+                            new Path(
+                                
Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
+                    dfsInputStream.readFully(offset, buffer);
                   } catch (IOException ex) {
                     logger.warn(
-                        "retry read HDFS {} failed, error detail {} ",
+                        "retry read DFS {} failed, error detail {} ",
                         location.getStorageInfo().getFilePath(),
                         e);
                     exception.set(ex);
@@ -261,9 +265,9 @@ public class DfsPartitionReader implements PartitionReader {
       fetchThread.shutdownNow();
     }
     try {
-      hdfsInputStream.close();
+      dfsInputStream.close();
     } catch (IOException e) {
-      logger.warn("close HDFS input stream failed.", e);
+      logger.warn("close DFS input stream failed.", e);
     }
     if (results.size() > 0) {
       results.forEach(ReferenceCounted::release);
diff --git a/common/pom.xml b/common/pom.xml
index 29baba769..9db43a17b 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -204,5 +204,25 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>hadoop-aws</id>
+      <activation>
+        <property>
+          <name>hadoop-aws-deps</name>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-aws</artifactId>
+          <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-bundle</artifactId>
+          <version>${aws.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 </project>
diff --git 
a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index ce99192c6..51978ac2e 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -96,41 +96,41 @@ public class DiskFileInfo extends FileInfo {
     return Utils.getIndexFilePath(filePath);
   }
 
-  public Path getHdfsPath() {
+  public Path getDfsPath() {
     return new Path(filePath);
   }
 
-  public Path getHdfsIndexPath() {
+  public Path getDfsIndexPath() {
     return new Path(Utils.getIndexFilePath(filePath));
   }
 
-  public Path getHdfsSortedPath() {
+  public Path getDfsSortedPath() {
     return new Path(Utils.getSortedFilePath(filePath));
   }
 
-  public Path getHdfsWriterSuccessPath() {
+  public Path getDfsWriterSuccessPath() {
     return new Path(Utils.getWriteSuccessFilePath(filePath));
   }
 
-  public Path getHdfsPeerWriterSuccessPath() {
+  public Path getDfsPeerWriterSuccessPath() {
     return new 
Path(Utils.getWriteSuccessFilePath(Utils.getPeerPath(filePath)));
   }
 
-  public void deleteAllFiles(FileSystem hdfsFs) {
-    if (isHdfs()) {
+  public void deleteAllFiles(FileSystem dfsFs) {
+    if (isDFS()) {
       try {
-        hdfsFs.delete(getHdfsPath(), false);
-        hdfsFs.delete(getHdfsWriterSuccessPath(), false);
-        hdfsFs.delete(getHdfsIndexPath(), false);
-        hdfsFs.delete(getHdfsSortedPath(), false);
+        dfsFs.delete(getDfsPath(), false);
+        dfsFs.delete(getDfsWriterSuccessPath(), false);
+        dfsFs.delete(getDfsIndexPath(), false);
+        dfsFs.delete(getDfsSortedPath(), false);
       } catch (Exception e) {
         // ignore delete exceptions because some other workers might be 
deleting the directory
         logger.debug(
-            "delete HDFS file {},{},{},{} failed {}",
-            getHdfsPath(),
-            getHdfsWriterSuccessPath(),
-            getHdfsIndexPath(),
-            getHdfsSortedPath(),
+            "delete DFS file {},{},{},{} failed {}",
+            getDfsPath(),
+            getDfsWriterSuccessPath(),
+            getDfsIndexPath(),
+            getDfsSortedPath(),
             e);
       }
     } else {
@@ -151,4 +151,12 @@ public class DiskFileInfo extends FileInfo {
   public boolean isHdfs() {
     return Utils.isHdfsPath(filePath);
   }
+
+  public boolean isS3() {
+    return Utils.isS3Path(filePath);
+  }
+
+  public boolean isDFS() {
+    return Utils.isS3Path(filePath) || Utils.isHdfsPath(filePath);
+  }
 }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java 
b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
index 621edb774..28cb65256 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
@@ -26,7 +26,9 @@ public class StorageInfo implements Serializable {
     HDD(1),
     SSD(2),
     HDFS(3),
-    OSS(4);
+    OSS(4),
+    S3(5);
+
     private final int value;
 
     Type(int value) {
@@ -54,6 +56,7 @@ public class StorageInfo implements Serializable {
   public static final int LOCAL_DISK_MASK = 0b10;
   public static final int HDFS_MASK = 0b100;
   public static final int OSS_MASK = 0b1000;
+  public static final int S3_MASK = 0b10000;
   public static final int ALL_TYPES_AVAILABLE_MASK = 0;
 
   // Default storage Type is MEMORY.
@@ -162,15 +165,28 @@ public class StorageInfo implements Serializable {
     return StorageInfo.HDFSOnly(availableStorageTypes);
   }
 
+  public static boolean S3Only(int availableStorageTypes) {
+    return availableStorageTypes == S3_MASK;
+  }
+
   public static boolean OSSAvailable(int availableStorageTypes) {
     return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
         || (availableStorageTypes & OSS_MASK) > 0;
   }
 
+  public static boolean S3Available(int availableStorageTypes) {
+    return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
+        || (availableStorageTypes & S3_MASK) > 0;
+  }
+
   public boolean OSSAvailable() {
     return StorageInfo.OSSAvailable(availableStorageTypes);
   }
 
+  public boolean S3Available() {
+    return StorageInfo.S3Available(availableStorageTypes);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
@@ -232,6 +248,9 @@ public class StorageInfo implements Serializable {
         case OSS:
           ava = ava | OSS_MASK;
           break;
+        case S3:
+          ava = ava | S3_MASK;
+          break;
       }
     }
     return ava;
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index f8de50486..2b8432c44 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -70,7 +70,7 @@ enum MessageType {
   PARTITION_SPLIT = 47;
   REGISTER_MAP_PARTITION_TASK = 48;
   HEARTBEAT_FROM_APPLICATION_RESPONSE = 49;
-  CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT = 50;
+  CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT = 50;
   OPEN_STREAM = 51;
   STREAM_HANDLER = 52;
   CHECK_WORKERS_AVAILABLE = 53;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 9f05c96bf..ff21a311f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
 import scala.concurrent.duration._
 import scala.util.Try
 
+import org.apache.celeborn.common.CelebornConf.{MASTER_INTERNAL_ENDPOINTS, 
S3_ACCESS_KEY, S3_DIR, S3_SECRET_KEY}
 import 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
 import org.apache.celeborn.common.identity.{DefaultIdentityProvider, 
IdentityProvider}
 import org.apache.celeborn.common.internal.Logging
@@ -641,6 +642,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   }
   def hasHDFSStorage: Boolean =
     get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) && 
get(HDFS_DIR).isDefined
+  def hasS3Storage: Boolean =
+    get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.S3.name()) && 
get(S3_DIR).isDefined
   def masterSlotAssignLoadAwareDiskGroupNum: Int = 
get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM)
   def masterSlotAssignLoadAwareDiskGroupGradient: Double =
     get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT)
@@ -886,6 +889,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def clientCommitFilesIgnoreExcludedWorkers: Boolean = 
get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS)
   def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
   def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
+  def dfsExpireDirsTimeoutMS: Long = get(DFS_EXPIRE_DIRS_TIMEOUT)
   def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
   def applicationUnregisterEnabled: Boolean = 
get(APPLICATION_UNREGISTER_ENABLED)
 
@@ -1093,7 +1097,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
         (dir, maxCapacity, flushThread, diskType)
       }
     }.getOrElse {
-      if (!hasHDFSStorage) {
+      if (!hasHDFSStorage && !hasS3Storage) {
         val prefix = workerStorageBaseDirPrefix
         val number = workerStorageBaseDirNumber
         (1 to number).map { i =>
@@ -1108,6 +1112,24 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def partitionSplitMinimumSize: Long = get(WORKER_PARTITION_SPLIT_MIN_SIZE)
   def partitionSplitMaximumSize: Long = get(WORKER_PARTITION_SPLIT_MAX_SIZE)
 
+  def s3AccessKey: String = get(S3_ACCESS_KEY).getOrElse("")
+
+  def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("")
+
+  def s3Endpoint: String = get(S3_ENDPOINT).getOrElse("")
+
+  def s3Dir: String = {
+    get(S3_DIR).map {
+      s3Dir =>
+        if (!Utils.isS3Path(s3Dir)) {
+          log.error(s"${S3_DIR.key} configuration is wrong $s3Dir. Disable S3 
support.")
+          ""
+        } else {
+          s3Dir
+        }
+    }.getOrElse("")
+  }
+
   def hdfsDir: String = {
     get(HDFS_DIR).map {
       hdfsDir =>
@@ -1177,10 +1199,12 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   // //////////////////////////////////////////////////////
   def workerFlusherBufferSize: Long = get(WORKER_FLUSHER_BUFFER_SIZE)
   def workerHdfsFlusherBufferSize: Long = get(WORKER_HDFS_FLUSHER_BUFFER_SIZE)
+  def workerS3FlusherBufferSize: Long = get(WORKER_S3_FLUSHER_BUFFER_SIZE)
   def workerWriterCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT)
   def workerHddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS)
   def workerSsdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS)
   def workerHdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS)
+  def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS)
   def workerCreateWriterMaxAttempts: Int = 
get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
 
   // //////////////////////////////////////////////////////
@@ -2134,6 +2158,14 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("1h")
 
+  val DFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.master.dfs.expireDirs.timeout")
+      .categories("master")
+      .version("0.6.0")
+      .doc("The timeout for a expire dirs to be deleted on S3 or HDFS.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("1h")
+
   val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.master.heartbeat.worker.timeout")
       .withAlternative("celeborn.worker.heartbeat.timeout")
@@ -2801,6 +2833,38 @@ object CelebornConf extends Logging {
       .stringConf
       .createOptional
 
+  val S3_DIR: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.s3.dir")
+      .categories("worker", "master", "client")
+      .version("0.6.0")
+      .doc("S3 base directory for Celeborn to store shuffle data.")
+      .stringConf
+      .createOptional
+
+  val S3_SECRET_KEY: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.s3.secret.key")
+      .categories("worker", "master", "client")
+      .version("0.6.0")
+      .doc("S3 secret key for Celeborn to store shuffle data.")
+      .stringConf
+      .createOptional
+
+  val S3_ACCESS_KEY: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.s3.access.key")
+      .categories("worker", "master", "client")
+      .version("0.6.0")
+      .doc("S3 access key for Celeborn to store shuffle data.")
+      .stringConf
+      .createOptional
+
+  val S3_ENDPOINT: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.s3.endpoint")
+      .categories("worker", "master", "client")
+      .version("0.6.0")
+      .doc("S3 endpoint for Celeborn to store shuffle data.")
+      .stringConf
+      .createOptional
+
   val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.worker.storage.disk.reserve.size")
       .withAlternative("celeborn.worker.disk.reserve.size")
@@ -3219,6 +3283,14 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("4m")
 
+  val WORKER_S3_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
+    buildConf("celeborn.worker.flusher.s3.buffer.size")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("Size of buffer used by a S3 flusher.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("4m")
+
   val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.writer.close.timeout")
       .categories("worker")
@@ -3259,6 +3331,14 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(8)
 
+  val WORKER_FLUSHER_S3_THREADS: ConfigEntry[Int] =
+    buildConf("celeborn.worker.flusher.s3.threads")
+      .categories("worker")
+      .doc("Flusher's thread count used for write data to S3.")
+      .version("0.6.0")
+      .intConf
+      .createWithDefault(8)
+
   val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.flusher.shutdownTimeout")
       .categories("worker")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index a0eedc3f9..bc801b7c8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -46,7 +46,8 @@ class WorkerInfo(
   var lastHeartbeat: Long = 0
   var workerStatus = WorkerStatus.normalWorkerStatus()
   val diskInfos =
-    if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, 
DiskInfo](_diskInfos) else null
+    if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, 
DiskInfo](_diskInfos)
+    else null
   val userResourceConsumption =
     if (_userResourceConsumption != null)
       JavaUtils.newConcurrentHashMap[UserIdentifier, 
ResourceConsumption](_userResourceConsumption)
@@ -198,40 +199,41 @@ class WorkerInfo(
 
   def updateThenGetDiskInfos(
       newDiskInfos: java.util.Map[String, DiskInfo],
-      estimatedPartitionSize: Option[Long] = None): util.Map[String, DiskInfo] 
= this.synchronized {
-    import scala.collection.JavaConverters._
-    for (newDisk <- newDiskInfos.values().asScala) {
-      val mountPoint: String = newDisk.mountPoint
-      val curDisk = diskInfos.get(mountPoint)
-      if (curDisk != null) {
-        curDisk.actualUsableSpace = newDisk.actualUsableSpace
-        curDisk.totalSpace = newDisk.totalSpace
-        // Update master's diskinfo activeslots to worker's value
-        curDisk.activeSlots = newDisk.activeSlots
-        curDisk.avgFlushTime = newDisk.avgFlushTime
-        curDisk.avgFetchTime = newDisk.avgFetchTime
-        if (estimatedPartitionSize.nonEmpty && curDisk.storageType != 
StorageInfo.Type.HDFS) {
-          curDisk.maxSlots = curDisk.actualUsableSpace / 
estimatedPartitionSize.get
-        }
-        curDisk.setStatus(newDisk.status)
-      } else {
-        if (estimatedPartitionSize.nonEmpty && newDisk.storageType != 
StorageInfo.Type.HDFS) {
-          newDisk.maxSlots = newDisk.actualUsableSpace / 
estimatedPartitionSize.get
+      estimatedPartitionSize: Option[Long] = None): util.Map[String, DiskInfo] 
=
+    this.synchronized {
+      import scala.collection.JavaConverters._
+      for (newDisk <- newDiskInfos.values().asScala) {
+        val mountPoint: String = newDisk.mountPoint
+        val curDisk = diskInfos.get(mountPoint)
+        if (curDisk != null) {
+          curDisk.actualUsableSpace = newDisk.actualUsableSpace
+          curDisk.totalSpace = newDisk.totalSpace
+          // Update master's diskinfo activeslots to worker's value
+          curDisk.activeSlots = newDisk.activeSlots
+          curDisk.avgFlushTime = newDisk.avgFlushTime
+          curDisk.avgFetchTime = newDisk.avgFetchTime
+          if (estimatedPartitionSize.nonEmpty && curDisk.storageType != 
StorageInfo.Type.HDFS && curDisk.storageType != StorageInfo.Type.S3) {
+            curDisk.maxSlots = curDisk.actualUsableSpace / 
estimatedPartitionSize.get
+          }
+          curDisk.setStatus(newDisk.status)
+        } else {
+          if (estimatedPartitionSize.nonEmpty && newDisk.storageType != 
StorageInfo.Type.HDFS && newDisk.storageType != StorageInfo.Type.S3) {
+            newDisk.maxSlots = newDisk.actualUsableSpace / 
estimatedPartitionSize.get
+          }
+          diskInfos.put(mountPoint, newDisk)
         }
-        diskInfos.put(mountPoint, newDisk)
       }
-    }
 
-    val nonExistsMountPoints: java.util.Set[String] = new util.HashSet[String]
-    nonExistsMountPoints.addAll(diskInfos.keySet)
-    nonExistsMountPoints.removeAll(newDiskInfos.keySet)
-    if (!nonExistsMountPoints.isEmpty) {
-      for (nonExistsMountPoint <- nonExistsMountPoints.asScala) {
-        diskInfos.remove(nonExistsMountPoint)
+      val nonExistsMountPoints: java.util.Set[String] = new 
util.HashSet[String]
+      nonExistsMountPoints.addAll(diskInfos.keySet)
+      nonExistsMountPoints.removeAll(newDiskInfos.keySet)
+      if (!nonExistsMountPoints.isEmpty) {
+        for (nonExistsMountPoint <- nonExistsMountPoints.asScala) {
+          diskInfos.remove(nonExistsMountPoint)
+        }
       }
+      JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos)
     }
-    JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos)
-  }
 
   def updateThenGetUserResourceConsumption(resourceConsumptions: util.Map[
     UserIdentifier,
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 94d4a22a8..c94744bb4 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -68,7 +68,7 @@ object ControlMessages extends Logging {
 
   case object CheckForWorkerUnavailableInfoTimeout extends Message
 
-  case object CheckForHDFSExpiredDirsTimeout extends Message
+  case object CheckForDFSExpiredDirsTimeout extends Message
 
   case object RemoveExpiredShuffle extends Message
 
@@ -509,8 +509,8 @@ object ControlMessages extends Logging {
     case CheckForApplicationTimeOut =>
       new TransportMessage(MessageType.CHECK_APPLICATION_TIMEOUT, null)
 
-    case CheckForHDFSExpiredDirsTimeout =>
-      new TransportMessage(MessageType.CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT, 
null)
+    case CheckForDFSExpiredDirsTimeout =>
+      new TransportMessage(MessageType.CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT, 
null)
 
     case RemoveExpiredShuffle =>
       new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null)
@@ -1263,8 +1263,8 @@ object ControlMessages extends Logging {
       case CHECK_APPLICATION_TIMEOUT_VALUE =>
         CheckForApplicationTimeOut
 
-      case CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT_VALUE =>
-        CheckForHDFSExpiredDirsTimeout
+      case CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT_VALUE =>
+        CheckForDFSExpiredDirsTimeout
 
       case WORKER_LOST_VALUE =>
         PbWorkerLost.parseFrom(message.getPayload)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index ae07187e7..cf15709a7 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -20,6 +20,8 @@ package org.apache.celeborn.common.util
 import java.io.{File, IOException}
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.security.UserGroupInformation
@@ -27,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.exception.CelebornException
 import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.StorageInfo
 
 object CelebornHadoopUtils extends Logging {
   private var logPrinted = new AtomicBoolean(false)
@@ -46,6 +49,20 @@ object CelebornHadoopUtils extends Logging {
             "prefix 'celeborn.hadoop.', e.g. 
'celeborn.hadoop.dfs.replication=3'")
       }
     }
+
+    if (conf.s3Dir.nonEmpty) {
+      if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || 
conf.s3Endpoint.isEmpty) {
+        throw new CelebornException(
+          "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3Endpoint 
is not set")
+      }
+      hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
+      hadoopConf.set(
+        "fs.s3a.aws.credentials.provider",
+        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
+      hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey)
+      hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey)
+      hadoopConf.set("fs.s3a.endpoint", conf.s3Endpoint)
+    }
     appendSparkHadoopConfigs(conf, hadoopConf)
     hadoopConf
   }
@@ -57,22 +74,31 @@ object CelebornHadoopUtils extends Logging {
     }
   }
 
-  def getHadoopFS(conf: CelebornConf): FileSystem = {
+  def getHadoopFS(conf: CelebornConf): java.util.Map[StorageInfo.Type, 
FileSystem] = {
     val hadoopConf = newConfiguration(conf)
     initKerberos(conf, hadoopConf)
-    new Path(conf.hdfsDir).getFileSystem(hadoopConf)
+    val hadoopFs = new java.util.HashMap[StorageInfo.Type, FileSystem]()
+    if (conf.hasHDFSStorage) {
+      val hdfsDir = new Path(conf.hdfsDir)
+      hadoopFs.put(StorageInfo.Type.HDFS, hdfsDir.getFileSystem(hadoopConf))
+    }
+    if (conf.hasS3Storage) {
+      val s3Dir = new Path(conf.s3Dir)
+      hadoopFs.put(StorageInfo.Type.S3, s3Dir.getFileSystem(hadoopConf))
+    }
+    hadoopFs
   }
 
-  def deleteHDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive: 
Boolean): Unit = {
+  def deleteDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive: 
Boolean): Unit = {
     try {
       val startTime = System.currentTimeMillis()
       hadoopFs.delete(path, recursive)
       logInfo(
-        s"Delete HDFS ${path}(recursive=$recursive) costs " +
+        s"Delete DFS ${path}(recursive=$recursive) costs " +
           Utils.msDurationToString(System.currentTimeMillis() - startTime))
     } catch {
       case e: IOException =>
-        logError(s"Failed to delete HDFS ${path}(recursive=$recursive) due to: 
", e)
+        logError(s"Failed to delete DFS ${path}(recursive=$recursive) due to: 
", e)
     }
   }
 
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 494c77ab9..d390561d7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1119,7 +1119,8 @@ object Utils extends Logging {
   val SORTED_SUFFIX = ".sorted"
   val INDEX_SUFFIX = ".index"
   val SUFFIX_HDFS_WRITE_SUCCESS = ".success"
-  val COMPATIBLE_HDFS_REGEX = "^[a-zA-Z0-9]+://.*"
+  val COMPATIBLE_HDFS_REGEX = "^(?!s3a://)[a-zA-Z0-9]+://.*"
+  val S3_REGEX = "^s3[a]?://([a-z0-9][a-z0-9-]{1,61}[a-z0-9])(/.*)?$"
 
   val UNKNOWN_APP_SHUFFLE_ID = -1
 
@@ -1127,6 +1128,10 @@ object Utils extends Logging {
     path.matches(COMPATIBLE_HDFS_REGEX)
   }
 
+  def isS3Path(path: String): Boolean = {
+    path.matches(S3_REGEX)
+  }
+
   def getSortedFilePath(path: String): String = {
     path + SORTED_SUFFIX
   }
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index 4e305005e..521e9872d 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -19,6 +19,7 @@ HikariCP/4.0.3//HikariCP-4.0.3.jar
 RoaringBitmap/1.0.6//RoaringBitmap-1.0.6.jar
 aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
 ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
+aws-java-sdk-bundle/1.12.367//aws-java-sdk-bundle-1.12.367.jar
 classgraph/4.8.138//classgraph-4.8.138.jar
 commons-cli/1.5.0//commons-cli-1.5.0.jar
 commons-crypto/1.0.0//commons-crypto-1.0.0.jar
@@ -27,6 +28,7 @@ commons-lang3/3.13.0//commons-lang3-3.13.0.jar
 commons-logging/1.1.3//commons-logging-1.1.3.jar
 failureaccess/1.0.2//failureaccess-1.0.2.jar
 guava/33.1.0-jre//guava-33.1.0-jre.jar
+hadoop-aws/3.3.6//hadoop-aws-3.3.6.jar
 hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar
 hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar
 hk2-api/2.6.1//hk2-api-2.6.1.jar
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 947a79db6..eda8edce2 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -122,4 +122,8 @@ license: |
 | celeborn.quota.identity.user-specific.userName | default | false | User name 
if celeborn.quota.identity.provider is 
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 |  | 
 | celeborn.storage.availableTypes | HDD | false | Enabled storages. Available 
options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. 
| 0.3.0 | celeborn.storage.activeTypes | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory 
for Celeborn to store shuffle data. | 0.2.0 |  | 
+| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 <!--end-include-->
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 5c5bd8ac2..11d2e09f3 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -34,6 +34,7 @@ license: |
 | celeborn.dynamicConfig.store.fs.path | &lt;undefined&gt; | false | The path 
of dynamic config file for fs store backend. The file format should be yaml. 
The default path is `${CELEBORN_CONF_DIR}/dynamicConfig.yaml`. | 0.5.0 |  | 
 | celeborn.internal.port.enabled | false | false | Whether to create a 
internal port on Masters/Workers for inter-Masters/Workers communication. This 
is beneficial when SASL authentication is enforced for all interactions between 
clients and Celeborn Services, but the services can exchange messages without 
being subject to SASL authentication. | 0.5.0 |  | 
 | celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf 
for debugging purposes. | 0.5.0 |  | 
+| celeborn.master.dfs.expireDirs.timeout | 1h | false | The timeout for a 
expire dirs to be deleted on S3 or HDFS. | 0.6.0 |  | 
 | celeborn.master.estimatedPartitionSize.initialSize | 64mb | false | Initial 
partition size for estimation, it will change according to runtime stats. | 
0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize | 
 | celeborn.master.estimatedPartitionSize.maxSize | &lt;undefined&gt; | false | 
Max partition size for estimation. Default value should be 
celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 |  | 
 | celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore 
partition size smaller than this configuration of partition size for 
estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate | 
@@ -74,4 +75,8 @@ license: |
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory 
for Celeborn to store shuffle data. | 0.2.0 |  | 
 | celeborn.storage.hdfs.kerberos.keytab | &lt;undefined&gt; | false | Kerberos 
keytab file path for HDFS storage connection. | 0.3.2 |  | 
 | celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | false | 
Kerberos principal for HDFS storage connection. | 0.3.2 |  | 
+| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 <!--end-include-->
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 2b6ec3df8..ed3d15454 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -44,6 +44,10 @@ license: |
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory 
for Celeborn to store shuffle data. | 0.2.0 |  | 
 | celeborn.storage.hdfs.kerberos.keytab | &lt;undefined&gt; | false | Kerberos 
keytab file path for HDFS storage connection. | 0.3.2 |  | 
 | celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | false | 
Kerberos principal for HDFS storage connection. | 0.3.2 |  | 
+| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.worker.activeConnection.max | &lt;undefined&gt; | false | If the 
number of active connections on a worker exceeds this configuration value, the 
worker will be marked as high-load in the heartbeat report, and the master will 
not include that node in the response of RequestSlots. | 0.3.1 |  | 
 | celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size 
of the application registry on Workers. | 0.5.0 |  | 
 | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | false | Threads 
count for read buffer per mount point. | 0.3.0 |  | 
@@ -74,6 +78,8 @@ license: |
 | celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per 
disk used for write data to HDD disks. | 0.2.0 |  | 
 | celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used 
by a HDFS flusher. | 0.3.0 |  | 
 | celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count 
used for write data to HDFS. | 0.2.0 |  | 
+| celeborn.worker.flusher.s3.buffer.size | 4m | false | Size of buffer used by 
a S3 flusher. | 0.6.0 |  | 
+| celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used 
for write data to S3. | 0.6.0 |  | 
 | celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher 
to shutdown. | 0.2.0 |  | 
 | celeborn.worker.flusher.ssd.threads | 16 | false | Flusher's thread count 
per disk used for write data to SSD disks. | 0.2.0 |  | 
 | celeborn.worker.flusher.threads | 16 | false | Flusher's thread count per 
disk for unknown-type disks. | 0.2.0 |  | 
diff --git a/master/pom.xml b/master/pom.xml
index abe061473..a2573e277 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -158,6 +158,26 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>hadoop-aws</id>
+      <activation>
+        <property>
+          <name>hadoop-aws-deps</name>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-aws</artifactId>
+          <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-bundle</artifactId>
+          <version>${aws.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
     <profile>
       <id>hadoop-2</id>
       <activation>
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index ced728b2f..eda5a3a2a 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -73,7 +73,8 @@ public class SlotsAllocator {
       for (Map.Entry<String, DiskInfo> diskInfoEntry : 
worker.diskInfos().entrySet()) {
         if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
           if (StorageInfo.localDiskAvailable(availableStorageTypes)
-              && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.HDFS) {
+              && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.HDFS
+              && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.S3) {
             usableDisks.add(
                 new UsableDiskInfo(
                     diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
@@ -82,6 +83,11 @@ public class SlotsAllocator {
             usableDisks.add(
                 new UsableDiskInfo(
                     diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
+          } else if (StorageInfo.S3Available(availableStorageTypes)
+              && diskInfoEntry.getValue().storageType() == 
StorageInfo.Type.S3) {
+            usableDisks.add(
+                new UsableDiskInfo(
+                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
           }
         }
       }
@@ -123,6 +129,10 @@ public class SlotsAllocator {
       return offerSlotsRoundRobin(
           workers, partitionIds, shouldReplicate, shouldRackAware, 
availableStorageTypes);
     }
+    if (StorageInfo.S3Only(availableStorageTypes)) {
+      return offerSlotsRoundRobin(
+          workers, partitionIds, shouldReplicate, shouldRackAware, 
availableStorageTypes);
+    }
 
     List<DiskInfo> usableDisks = new ArrayList<>();
     Map<DiskInfo, WorkerInfo> diskToWorkerMap = new HashMap<>();
@@ -141,7 +151,8 @@ public class SlotsAllocator {
                                       ? Option.empty()
                                       : Option.apply(diskReserveRatio.get()))
                           && diskInfo.status().equals(DiskStatus.HEALTHY)
-                          && diskInfo.storageType() != StorageInfo.Type.HDFS) {
+                          && diskInfo.storageType() != StorageInfo.Type.HDFS
+                          && diskInfo.storageType() != StorageInfo.Type.S3) {
                         usableDisks.add(diskInfo);
                       }
                     }));
@@ -198,6 +209,8 @@ public class SlotsAllocator {
       DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo;
       if (selectedDiskInfo.storageType() == StorageInfo.Type.HDFS) {
         storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, 
availableStorageTypes);
+      } else if (selectedDiskInfo.storageType() == StorageInfo.Type.S3) {
+        storageInfo = new StorageInfo("", StorageInfo.Type.S3, 
availableStorageTypes);
       } else {
         storageInfo =
             new StorageInfo(
@@ -211,6 +224,7 @@ public class SlotsAllocator {
         DiskInfo[] diskInfos =
             selectedWorker.diskInfos().values().stream()
                 .filter(p -> p.storageType() != StorageInfo.Type.HDFS)
+                .filter(p -> p.storageType() != StorageInfo.Type.S3)
                 .collect(Collectors.toList())
                 .toArray(new DiskInfo[0]);
         storageInfo =
@@ -219,6 +233,8 @@ public class SlotsAllocator {
                 diskInfos[diskIndex].storageType(),
                 availableStorageTypes);
         workerDiskIndex.put(selectedWorker, (diskIndex + 1) % 
diskInfos.length);
+      } else if (StorageInfo.S3Available(availableStorageTypes)) {
+        storageInfo = new StorageInfo("", StorageInfo.Type.S3, 
availableStorageTypes);
       } else {
         storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, 
availableStorageTypes);
       }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index f3ab34b67..371e89162 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -212,10 +212,14 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     long healthyDiskNum =
         disks.values().stream().filter(s -> 
s.status().equals(DiskStatus.HEALTHY)).count();
     if (!excludedWorkers.contains(worker)
-        && (((disks.isEmpty() || healthyDiskNum <= 0) && 
!conf.hasHDFSStorage()) || highWorkload)) {
+        && (((disks.isEmpty() || healthyDiskNum <= 0)
+                && (!conf.hasHDFSStorage())
+                && (!conf.hasS3Storage()))
+            || highWorkload)) {
       LOG.debug("Worker: {} num total slots is 0, add to excluded list", 
worker);
       excludedWorkers.add(worker);
-    } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage()) && 
!highWorkload) {
+    } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage() || 
conf.hasS3Storage())
+        && !highWorkload) {
       // only unblack if numSlots larger than 0
       excludedWorkers.remove(worker);
     }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 2ce5fc83f..1e99956e9 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -165,6 +165,7 @@ private[celeborn] class Master(
   private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _
   private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _
   private var checkForHDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _
+  private var checkForS3RemnantDirsTimeOutTask: ScheduledFuture[_] = _
   private val nonEagerHandler = 
ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64)
 
   // Config constants
@@ -172,8 +173,9 @@ private[celeborn] class Master(
   private val appHeartbeatTimeoutMs = conf.appHeartbeatTimeoutMs
   private val workerUnavailableInfoExpireTimeoutMs = 
conf.workerUnavailableInfoExpireTimeout
 
-  private val hdfsExpireDirsTimeoutMS = conf.hdfsExpireDirsTimeoutMS
+  private val dfsExpireDirsTimeoutMS = conf.dfsExpireDirsTimeoutMS
   private val hasHDFSStorage = conf.hasHDFSStorage
+  private val hasS3Storage = conf.hasS3Storage
 
   private val quotaManager = new QuotaManager(conf, configService)
   private val masterResourceConsumptionInterval = 
conf.masterResourceConsumptionInterval
@@ -211,7 +213,7 @@ private[celeborn] class Master(
     TimeUnit.MILLISECONDS)
   private val slotsAssignPolicy = conf.masterSlotAssignPolicy
 
-  private var hadoopFs: FileSystem = _
+  private var hadoopFs: util.Map[StorageInfo.Type, FileSystem] = _
   masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
     statusSystem.registeredShuffle.size
   }
@@ -319,15 +321,15 @@ private[celeborn] class Master(
       workerUnavailableInfoExpireTimeoutMs / 2,
       TimeUnit.MILLISECONDS)
 
-    if (hasHDFSStorage) {
-      checkForHDFSRemnantDirsTimeOutTask = 
forwardMessageThread.scheduleWithFixedDelay(
+    if (hasHDFSStorage || hasS3Storage) {
+      checkForS3RemnantDirsTimeOutTask = 
forwardMessageThread.scheduleWithFixedDelay(
         new Runnable {
           override def run(): Unit = Utils.tryLogNonFatalError {
-            self.send(CheckForHDFSExpiredDirsTimeout)
+            self.send(CheckForDFSExpiredDirsTimeout)
           }
         },
-        hdfsExpireDirsTimeoutMS,
-        hdfsExpireDirsTimeoutMS,
+        dfsExpireDirsTimeoutMS,
+        dfsExpireDirsTimeoutMS,
         TimeUnit.MILLISECONDS)
     }
 
@@ -350,6 +352,9 @@ private[celeborn] class Master(
     if (checkForHDFSRemnantDirsTimeOutTask != null) {
       checkForHDFSRemnantDirsTimeOutTask.cancel(true)
     }
+    if (checkForS3RemnantDirsTimeOutTask != null) {
+      checkForS3RemnantDirsTimeOutTask.cancel(true)
+    }
     forwardMessageThread.shutdownNow()
     rackResolver.stop()
     if (authEnabled) {
@@ -380,8 +385,8 @@ private[celeborn] class Master(
       executeWithLeaderChecker(null, timeoutWorkerUnavailableInfos())
     case CheckForApplicationTimeOut =>
       executeWithLeaderChecker(null, timeoutDeadApplications())
-    case CheckForHDFSExpiredDirsTimeout =>
-      executeWithLeaderChecker(null, checkAndCleanExpiredAppDirsOnHDFS())
+    case CheckForDFSExpiredDirsTimeout =>
+      executeWithLeaderChecker(null, checkAndCleanExpiredAppDirsOnDFS())
     case pb: PbWorkerLost =>
       val host = pb.getHost
       val rpcPort = pb.getRpcPort
@@ -984,38 +989,44 @@ private[celeborn] class Master(
         workersAssignedToApp.remove(appId)
         statusSystem.handleAppLost(appId, requestId)
         logInfo(s"Removed application $appId")
-        if (hasHDFSStorage) {
-          checkAndCleanExpiredAppDirsOnHDFS(appId)
+        if (hasHDFSStorage || hasS3Storage) {
+          checkAndCleanExpiredAppDirsOnDFS(appId)
         }
         context.reply(ApplicationLostResponse(StatusCode.SUCCESS))
       }
     })
   }
 
-  private def checkAndCleanExpiredAppDirsOnHDFS(expiredDir: String = ""): Unit 
= {
+  private def checkAndCleanExpiredAppDirsOnDFS(expiredDir: String = ""): Unit 
= {
     if (hadoopFs == null) {
       try {
         hadoopFs = CelebornHadoopUtils.getHadoopFS(conf)
       } catch {
         case e: Exception =>
-          logError("Celeborn initialize HDFS failed.", e)
+          logError("Celeborn initialize DFS failed.", e)
           throw e
       }
     }
-    val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir)
-    if (hadoopFs.exists(hdfsWorkPath)) {
+    if (hasHDFSStorage) processDir(conf.hdfsDir, expiredDir)
+    if (hasS3Storage) processDir(conf.s3Dir, expiredDir)
+  }
+
+  private def processDir(dfsDir: String, expiredDir: String): Unit = {
+    val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir)
+    hadoopFs.asScala.map(_._2).filter(_.exists(dfsWorkPath)).foreach { fs =>
       if (expiredDir.nonEmpty) {
-        val dirToDelete = new Path(hdfsWorkPath, expiredDir)
+        val dirToDelete = new Path(dfsWorkPath, expiredDir)
         // delete specific app dir on application lost
-        CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, dirToDelete, 
true)
+        CelebornHadoopUtils.deleteDFSPathOrLogError(fs, dirToDelete, true)
       } else {
-        val iter = hadoopFs.listStatusIterator(hdfsWorkPath)
+        val iter = fs.listStatusIterator(dfsWorkPath)
         while (iter.hasNext && isMasterActive == 1) {
           val fileStatus = iter.next()
           if 
(!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) {
-            CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, 
fileStatus.getPath, true)
+            CelebornHadoopUtils.deleteDFSPathOrLogError(fs, 
fileStatus.getPath, true)
           }
         }
+
       }
     }
   }
diff --git a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml 
b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
index e854f0a2c..3db792a54 100644
--- a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
@@ -541,6 +541,7 @@ components:
             - SSD
             - HDFS
             - OSS
+            - S3
         mapIdBitMap:
           type: string
           description: The map id bitmap hint.
diff --git a/pom.xml b/pom.xml
index 85b136aae..ee31504f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,6 @@
 
     <!-- use hadoop-3 as default  -->
     <hadoop.version>3.3.6</hadoop.version>
-
     <!--
     If you change codahale.metrics.version, you also need to change
     the link to metrics.dropwizard.io in docs/monitoring.md.
@@ -1283,6 +1282,25 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>hadoop-aws</id>
+      <properties>
+        <hadoop-aws-deps>true</hadoop-aws-deps>
+        <aws.version>1.12.367</aws.version>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-aws</artifactId>
+          <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-bundle</artifactId>
+          <version>${aws.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
     <profile>
       <id>spark-2.4</id>
       <modules>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 903833192..d31854eea 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -49,6 +49,7 @@ object Dependencies {
   val findbugsVersion = "1.3.9"
   val guavaVersion = "33.1.0-jre"
   val hadoopVersion = "3.3.6"
+  val awsVersion = "1.12.367"
   val junitInterfaceVersion = "0.13.3"
   // don't forget update `junitInterfaceVersion` when we upgrade junit
   val junitVersion = "4.13.2"
@@ -110,6 +111,8 @@ object Dependencies {
     ExclusionRule("jline", "jline"),
     ExclusionRule("log4j", "log4j"),
     ExclusionRule("org.slf4j", "slf4j-log4j12"))
+  val hadoopAws = "org.apache.hadoop" % "hadoop-aws" % hadoopVersion
+  val awsClient = "com.amazonaws" % "aws-java-sdk-bundle" % awsVersion
   val ioDropwizardMetricsCore = "io.dropwizard.metrics" % "metrics-core" % 
metricsVersion
   val ioDropwizardMetricsGraphite = "io.dropwizard.metrics" % 
"metrics-graphite" % metricsVersion excludeAll (
     ExclusionRule("com.rabbitmq", "amqp-client"))
@@ -442,6 +445,13 @@ object Utils {
 }
 
 object CelebornCommon {
+
+  lazy val hadoopAwsDependencies = 
if(profiles.exists(_.startsWith("hadoop-aws"))){
+    Seq(Dependencies.hadoopAws, Dependencies.awsClient)
+  } else {
+    Seq.empty
+  }
+
   lazy val common = Project("celeborn-common", file("common"))
     .settings (
       commonSettings,
@@ -478,7 +488,7 @@ object CelebornCommon {
         // SSL support
         Dependencies.bouncycastleBcprovJdk18on,
         Dependencies.bouncycastleBcpkixJdk18on
-      ) ++ commonUnitTestDependencies,
+      ) ++ commonUnitTestDependencies ++ hadoopAwsDependencies,
 
       Compile / sourceGenerators += Def.task {
         val file = (Compile / sourceManaged).value / "org" / "apache" / 
"celeborn" / "package.scala"
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
index 0b2802035..a4e4d2bca 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
@@ -124,6 +124,8 @@ object ApiUtils {
         locationData.storage(StorageEnum.HDFS)
       case StorageInfo.Type.OSS =>
         locationData.storage(StorageEnum.OSS)
+      case StorageInfo.Type.S3 =>
+        locationData.storage(StorageEnum.S3)
     }
     
Option(partitionLocation.getMapIdBitMap).map(_.toString).foreach(locationData.mapIdBitMap)
     locationData
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index c8b6af9af..368e3485a 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -228,7 +228,7 @@ public class MemoryManager {
     this.storageManager = storageManager;
     if (memoryFileStorageThreshold > 0
         && storageManager != null
-        && storageManager.localOrHdfsStorageAvailable()) {
+        && storageManager.localOrDfsStorageAvailable()) {
       ScheduledExecutorService memoryFileStorageService =
           
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-file-storage-checker");
       memoryFileStorageService.scheduleWithFixedDelay(
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
index 4eab8f0b1..d6be27d28 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.meta.MapFileMeta;
 import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.common.util.FileChannelUtils;
 import org.apache.celeborn.common.util.Utils;
 
@@ -53,6 +55,8 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
   private FileChannel indexChannel;
   private volatile boolean isRegionFinished = true;
 
+  private FileSystem hadoopFs;
+
   public MapPartitionDataWriter(
       StorageManager storageManager,
       AbstractSource workerSource,
@@ -63,11 +67,14 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
     super(storageManager, workerSource, conf, deviceMonitor, writerContext, 
false);
 
     Preconditions.checkState(diskFileInfo != null);
-    if (!diskFileInfo.isHdfs()) {
+    if (!diskFileInfo.isDFS()) {
       indexChannel = 
FileChannelUtils.createWritableFileChannel(diskFileInfo.getIndexPath());
     } else {
+      StorageInfo.Type storageType =
+          diskFileInfo.isS3() ? StorageInfo.Type.S3 : StorageInfo.Type.HDFS;
+      this.hadoopFs = StorageManager.hadoopFs().get(storageType);
       try {
-        StorageManager.hadoopFs().create(diskFileInfo.getHdfsIndexPath(), 
true).close();
+        hadoopFs.create(diskFileInfo.getDfsIndexPath(), true).close();
       } catch (IOException e) {
         try {
           // If create file failed, wait 10 ms and retry
@@ -75,7 +82,7 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
         } catch (InterruptedException ex) {
           throw new RuntimeException(ex);
         }
-        StorageManager.hadoopFs().create(diskFileInfo.getHdfsIndexPath(), 
true).close();
+        hadoopFs.create(diskFileInfo.getDfsIndexPath(), true).close();
       }
     }
   }
@@ -120,12 +127,12 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
     return super.close(
         this::flushIndex,
         () -> {
-          if (diskFileInfo.isHdfs()) {
-            if 
(StorageManager.hadoopFs().exists(diskFileInfo.getHdfsPeerWriterSuccessPath())) 
{
-              StorageManager.hadoopFs().delete(diskFileInfo.getHdfsPath(), 
false);
+          if (diskFileInfo.isDFS()) {
+            if (hadoopFs.exists(diskFileInfo.getDfsPeerWriterSuccessPath())) {
+              hadoopFs.delete(diskFileInfo.getDfsPath(), false);
               deleted = true;
             } else {
-              
StorageManager.hadoopFs().create(diskFileInfo.getHdfsWriterSuccessPath()).close();
+              hadoopFs.create(diskFileInfo.getDfsWriterSuccessPath()).close();
             }
           }
         },
@@ -133,16 +140,15 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
           if (indexChannel != null) {
             indexChannel.close();
           }
-          if (diskFileInfo.isHdfs()) {
-            if (StorageManager.hadoopFs()
-                .exists(
-                    new Path(
-                        Utils.getWriteSuccessFilePath(
-                            Utils.getPeerPath(diskFileInfo.getIndexPath()))))) 
{
-              
StorageManager.hadoopFs().delete(diskFileInfo.getHdfsIndexPath(), false);
+          if (diskFileInfo.isDFS()) {
+            if (hadoopFs.exists(
+                new Path(
+                    Utils.getWriteSuccessFilePath(
+                        Utils.getPeerPath(diskFileInfo.getIndexPath()))))) {
+              hadoopFs.delete(diskFileInfo.getDfsIndexPath(), false);
               deleted = true;
             } else {
-              StorageManager.hadoopFs()
+              hadoopFs
                   .create(new 
Path(Utils.getWriteSuccessFilePath(diskFileInfo.getIndexPath())))
                   .close();
             }
@@ -255,11 +261,10 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
             while (indexBuffer.hasRemaining()) {
               indexChannel.write(indexBuffer);
             }
-          } else if (diskFileInfo.isHdfs()) {
-            FSDataOutputStream hdfsStream =
-                
StorageManager.hadoopFs().append(diskFileInfo.getHdfsIndexPath());
-            hdfsStream.write(indexBuffer.array());
-            hdfsStream.close();
+          } else if (diskFileInfo.isDFS()) {
+            FSDataOutputStream dfsStream = 
hadoopFs.append(diskFileInfo.getDfsIndexPath());
+            dfsStream.write(indexBuffer.array());
+            dfsStream.close();
           }
         }
         indexBuffer.clear();
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index f40f8ee66..9cae453a6 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -32,6 +32,7 @@ import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hadoop.fs.FileSystem;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,12 +102,15 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
   private final PartitionDataWriterContext writerContext;
   private final long localFlusherBufferSize;
   private final long hdfsFlusherBufferSize;
+
+  private final long s3FlusherBufferSize;
   private Exception exception = null;
   private boolean metricsCollectCriticalEnabled;
   private long chunkSize;
-
   private UserBufferInfo userBufferInfo = null;
 
+  protected FileSystem hadoopFs;
+
   public PartitionDataWriter(
       StorageManager storageManager,
       AbstractSource workerSource,
@@ -128,6 +132,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     this.writerContext = writerContext;
     this.localFlusherBufferSize = conf.workerFlusherBufferSize();
     this.hdfsFlusherBufferSize = conf.workerHdfsFlusherBufferSize();
+    this.s3FlusherBufferSize = conf.workerS3FlusherBufferSize();
     this.metricsCollectCriticalEnabled = conf.metricsCollectCriticalEnabled();
     this.chunkSize = conf.shuffleChunkSize();
 
@@ -166,16 +171,19 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
   }
 
   public void initFileChannelsForDiskFile() throws IOException {
-    if (!this.diskFileInfo.isHdfs()) {
+    if (!this.diskFileInfo.isDFS()) {
       this.flusherBufferSize = localFlusherBufferSize;
       channel = 
FileChannelUtils.createWritableFileChannel(this.diskFileInfo.getFilePath());
     } else {
-      this.flusherBufferSize = hdfsFlusherBufferSize;
-      // We open the stream and close immediately because HDFS output stream 
will
+      StorageInfo.Type storageType =
+          diskFileInfo.isS3() ? StorageInfo.Type.S3 : StorageInfo.Type.HDFS;
+      this.hadoopFs = StorageManager.hadoopFs().get(storageType);
+      this.flusherBufferSize = diskFileInfo.isS3() ? s3FlusherBufferSize : 
hdfsFlusherBufferSize;
+      // We open the stream and close immediately because DFS output stream 
will
       // create a DataStreamer that is a thread.
-      // If we reuse HDFS output stream, we will exhaust the memory soon.
+      // If we reuse DFS output stream, we will exhaust the memory soon.
       try {
-        StorageManager.hadoopFs().create(this.diskFileInfo.getHdfsPath(), 
true).close();
+        hadoopFs.create(this.diskFileInfo.getDfsPath(), true).close();
       } catch (IOException e) {
         try {
           // If create file failed, wait 10 ms and retry
@@ -183,7 +191,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
         } catch (InterruptedException ex) {
           throw new RuntimeException(ex);
         }
-        StorageManager.hadoopFs().create(this.diskFileInfo.getHdfsPath(), 
true).close();
+        hadoopFs.create(this.diskFileInfo.getDfsPath(), true).close();
       }
     }
   }
@@ -220,7 +228,9 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
           if (channel != null) {
             task = new LocalFlushTask(flushBuffer, channel, notifier, false);
           } else if (diskFileInfo.isHdfs()) {
-            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier, false);
+            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(), 
notifier, false);
+          } else if (diskFileInfo.isS3()) {
+            task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(), 
notifier, false);
           }
           MemoryManager.instance().releaseMemoryFileStorage(numBytes);
           MemoryManager.instance().incrementDiskBuffer(numBytes);
@@ -246,7 +256,9 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
             if (channel != null) {
               task = new LocalFlushTask(flushBuffer, channel, notifier, true);
             } else if (diskFileInfo.isHdfs()) {
-              task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, true);
+              task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(), 
notifier, true);
+            } else if (diskFileInfo.isS3()) {
+              task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(), 
notifier, true);
             }
           }
         }
@@ -270,7 +282,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     if (!isMemoryShuffleFile.get()) {
       return false;
     } else {
-      return !storageManager.localOrHdfsStorageAvailable()
+      return !storageManager.localOrDfsStorageAvailable()
           && (memoryFileInfo.getFileLength() > memoryFileStorageMaxFileSize
               || !MemoryManager.instance().memoryFileStorageAvailable());
     }
@@ -325,7 +337,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
         }
       } else {
         if (flushBufferReadableBytes > memoryFileStorageMaxFileSize
-            && storageManager.localOrHdfsStorageAvailable()) {
+            && storageManager.localOrDfsStorageAvailable()) {
           logger.debug(
               "{} Evict, memory buffer is  {}",
               writerContext.getPartitionLocation().getFileName(),
@@ -376,9 +388,11 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
 
   public StorageInfo getStorageInfo() {
     if (diskFileInfo != null) {
-      if (diskFileInfo.isHdfs()) {
+      if (diskFileInfo.isDFS()) {
         if (deleted) {
           return null;
+        } else if (diskFileInfo.isS3()) {
+          return new StorageInfo(StorageInfo.Type.S3, true, 
diskFileInfo.getFilePath());
         } else {
           return new StorageInfo(StorageInfo.Type.HDFS, true, 
diskFileInfo.getFilePath());
         }
@@ -442,7 +456,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
       finalClose.run();
 
       // unregister from DeviceMonitor
-      if (diskFileInfo != null && !diskFileInfo.isHdfs()) {
+      if (diskFileInfo != null && !this.diskFileInfo.isDFS()) {
         logger.debug("file info {} unregister from device monitor", 
diskFileInfo);
         deviceMonitor.unregisterFileWriter(this);
       }
@@ -513,10 +527,9 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     if (!destroyed) {
       destroyed = true;
       if (diskFileInfo != null) {
-        diskFileInfo.deleteAllFiles(StorageManager.hadoopFs());
-
+        diskFileInfo.deleteAllFiles(hadoopFs);
         // unregister from DeviceMonitor
-        if (!diskFileInfo.isHdfs()) {
+        if (!diskFileInfo.isDFS()) {
           deviceMonitor.unregisterFileWriter(this);
         }
       }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 4cbd28b46..81bc44eb7 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -45,6 +45,7 @@ import io.netty.buffer.CompositeByteBuf;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.meta.*;
 import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.common.unsafe.Platform;
 import org.apache.celeborn.common.util.*;
 import org.apache.celeborn.common.util.ShuffleBlockInfoUtils.ShuffleBlockInfo;
@@ -252,8 +254,8 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
             throw new IOException(
                 "Sort scheduler thread is interrupted means worker is shutting 
down.", e);
           } catch (IOException e) {
-            logger.error("File sorter access HDFS failed.", e);
-            throw new IOException("File sorter access HDFS failed.", e);
+            logger.error("File sorter access DFS failed.", e);
+            throw new IOException("File sorter access DFS failed.", e);
           }
         }
       }
@@ -473,14 +475,17 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
   }
 
   protected void writeIndex(
-      Map<Integer, List<ShuffleBlockInfo>> indexMap, String indexFilePath, 
boolean isHdfs)
+      Map<Integer, List<ShuffleBlockInfo>> indexMap, String indexFilePath, 
boolean isDfs)
       throws IOException {
-    FSDataOutputStream hdfsIndexOutput = null;
+    FSDataOutputStream dfsIndexOutput = null;
     FileChannel indexFileChannel = null;
-    if (isHdfs) {
+    if (isDfs) {
+      boolean isS3 = Utils.isS3Path(indexFilePath);
+      StorageInfo.Type storageType = isS3 ? StorageInfo.Type.S3 : 
StorageInfo.Type.HDFS;
+      FileSystem hadoopFs = StorageManager.hadoopFs().get(storageType);
       // If the index file exists, it will be overwritten.
       // So there is no need to check its existence.
-      hdfsIndexOutput = StorageManager.hadoopFs().create(new 
Path(indexFilePath));
+      dfsIndexOutput = hadoopFs.create(new Path(indexFilePath));
     } else {
       indexFileChannel = 
FileChannelUtils.createWritableFileChannel(indexFilePath);
     }
@@ -505,12 +510,12 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     }
 
     indexBuf.flip();
-    if (isHdfs) {
+    if (isDfs) {
       // Direct byte buffer has no array, so can not invoke indexBuf.array() 
here.
       byte[] tmpBuf = new byte[indexSize];
       indexBuf.get(tmpBuf);
-      hdfsIndexOutput.write(tmpBuf);
-      hdfsIndexOutput.close();
+      dfsIndexOutput.write(tmpBuf);
+      dfsIndexOutput.close();
     } else {
       while (indexBuf.hasRemaining()) {
         indexFileChannel.write(indexBuf);
@@ -588,25 +593,25 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
               fileId,
               () -> {
                 FileChannel indexChannel = null;
-                FSDataInputStream hdfsIndexStream = null;
-                boolean isHdfs = Utils.isHdfsPath(indexFilePath);
+                FSDataInputStream dfsIndexStream = null;
+                boolean isDfs = Utils.isHdfsPath(indexFilePath) || 
Utils.isS3Path(indexFilePath);
+                boolean isS3 = Utils.isS3Path(indexFilePath);
                 int indexSize;
                 try {
-                  if (isHdfs) {
-                    hdfsIndexStream = StorageManager.hadoopFs().open(new 
Path(indexFilePath));
-                    indexSize =
-                        (int)
-                            StorageManager.hadoopFs()
-                                .getFileStatus(new Path(indexFilePath))
-                                .getLen();
+                  if (isDfs) {
+                    StorageInfo.Type storageType =
+                        isS3 ? StorageInfo.Type.S3 : StorageInfo.Type.HDFS;
+                    FileSystem hadoopFs = 
StorageManager.hadoopFs().get(storageType);
+                    dfsIndexStream = hadoopFs.open(new Path(indexFilePath));
+                    indexSize = (int) hadoopFs.getFileStatus(new 
Path(indexFilePath)).getLen();
                   } else {
                     indexChannel = 
FileChannelUtils.openReadableFileChannel(indexFilePath);
                     File indexFile = new File(indexFilePath);
                     indexSize = (int) indexFile.length();
                   }
                   ByteBuffer indexBuf = ByteBuffer.allocate(indexSize);
-                  if (isHdfs) {
-                    readStreamFully(hdfsIndexStream, indexBuf, indexFilePath);
+                  if (isDfs) {
+                    readStreamFully(dfsIndexStream, indexBuf, indexFilePath);
                   } else {
                     readChannelFully(indexChannel, indexBuf, indexFilePath);
                   }
@@ -623,7 +628,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
                   throw new IOException("Read sorted shuffle file index 
failed.", e);
                 } finally {
                   IOUtils.closeQuietly(indexChannel, null);
-                  IOUtils.closeQuietly(hdfsIndexStream, null);
+                  IOUtils.closeQuietly(dfsIndexStream, null);
                 }
               });
     } catch (ExecutionException e) {
@@ -645,25 +650,30 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     private final String fileId;
     private final String shuffleKey;
     private final boolean isHdfs;
+    private final boolean isS3;
+    private final boolean isDfs;
     private final boolean isPrefetch;
     private final FileInfo originFileInfo;
 
-    private FSDataInputStream hdfsOriginInput = null;
-    private FSDataOutputStream hdfsSortedOutput = null;
+    private FSDataInputStream dfsOriginInput = null;
+    private FSDataOutputStream dfsSortedOutput = null;
     private FileChannel originFileChannel = null;
     private FileChannel sortedFileChannel = null;
+    private FileSystem hadoopFs;
 
     FileSorter(DiskFileInfo fileInfo, String fileId, String shuffleKey) throws 
IOException {
       this.originFileInfo = fileInfo;
       this.originFilePath = fileInfo.getFilePath();
       this.sortedFilePath = Utils.getSortedFilePath(originFilePath);
       this.isHdfs = fileInfo.isHdfs();
-      this.isPrefetch = !isHdfs && prefetchEnabled;
+      this.isS3 = fileInfo.isS3();
+      this.isDfs = isHdfs || isS3;
+      this.isPrefetch = !isDfs && prefetchEnabled;
       this.originFileLen = fileInfo.getFileLength();
       this.fileId = fileId;
       this.shuffleKey = shuffleKey;
       this.indexFilePath = Utils.getIndexFilePath(originFilePath);
-      if (!isHdfs) {
+      if (!isDfs) {
         File sortedFile = new File(this.sortedFilePath);
         if (sortedFile.exists()) {
           sortedFile.delete();
@@ -673,11 +683,14 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
           indexFile.delete();
         }
       } else {
-        if (StorageManager.hadoopFs().exists(fileInfo.getHdfsSortedPath())) {
-          StorageManager.hadoopFs().delete(fileInfo.getHdfsSortedPath(), 
false);
+        boolean isS3 = Utils.isS3Path(indexFilePath);
+        StorageInfo.Type storageType = isS3 ? StorageInfo.Type.S3 : 
StorageInfo.Type.HDFS;
+        this.hadoopFs = StorageManager.hadoopFs().get(storageType);
+        if (hadoopFs.exists(fileInfo.getDfsSortedPath())) {
+          hadoopFs.delete(fileInfo.getDfsSortedPath(), false);
         }
-        if (StorageManager.hadoopFs().exists(fileInfo.getHdfsIndexPath())) {
-          StorageManager.hadoopFs().delete(fileInfo.getHdfsIndexPath(), false);
+        if (hadoopFs.exists(fileInfo.getDfsIndexPath())) {
+          hadoopFs.delete(fileInfo.getDfsIndexPath(), false);
         }
       }
     }
@@ -748,7 +761,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
           sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);
         }
 
-        writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
+        writeIndex(sortedBlockInfoMap, indexFilePath, isDfs);
         updateSortedShuffleFiles(shuffleKey, fileId, originFileLen);
         originFileInfo.getReduceFileMeta().setSorted();
         cleaner.add(this);
@@ -775,10 +788,9 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     }
 
     private void initializeFiles() throws IOException {
-      if (isHdfs) {
-        hdfsOriginInput = StorageManager.hadoopFs().open(new 
Path(originFilePath));
-        hdfsSortedOutput =
-            StorageManager.hadoopFs().create(new Path(sortedFilePath), true, 
256 * 1024);
+      if (isDfs) {
+        dfsOriginInput = hadoopFs.open(new Path(originFilePath));
+        dfsSortedOutput = hadoopFs.create(new Path(sortedFilePath), true, 256 
* 1024);
       } else {
         originFileChannel = 
FileChannelUtils.openReadableFileChannel(originFilePath);
         sortedFileChannel = 
FileChannelUtils.createWritableFileChannel(sortedFilePath);
@@ -786,23 +798,23 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     }
 
     private void closeFiles() {
-      IOUtils.closeQuietly(hdfsOriginInput, null);
-      IOUtils.closeQuietly(hdfsSortedOutput, null);
+      IOUtils.closeQuietly(dfsOriginInput, null);
+      IOUtils.closeQuietly(dfsSortedOutput, null);
       IOUtils.closeQuietly(originFileChannel, null);
       IOUtils.closeQuietly(sortedFileChannel, null);
     }
 
     private void readBufferFully(ByteBuffer buffer) throws IOException {
-      if (isHdfs) {
-        readStreamFully(hdfsOriginInput, buffer, originFilePath);
+      if (isDfs) {
+        readStreamFully(dfsOriginInput, buffer, originFilePath);
       } else {
         readChannelFully(originFileChannel, buffer, originFilePath);
       }
     }
 
     private long transferBlock(long offset, long length) throws IOException {
-      if (isHdfs) {
-        return transferStreamFully(hdfsOriginInput, hdfsSortedOutput, offset, 
length);
+      if (isDfs) {
+        return transferStreamFully(dfsOriginInput, dfsSortedOutput, offset, 
length);
       } else {
         return transferChannelFully(originFileChannel, sortedFileChannel, 
offset, length);
       }
@@ -810,8 +822,8 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
     public void deleteOriginFiles() throws IOException {
       boolean deleteSuccess;
-      if (isHdfs) {
-        deleteSuccess = StorageManager.hadoopFs().delete(new 
Path(originFilePath), false);
+      if (isDfs) {
+        deleteSuccess = hadoopFs.delete(new Path(originFilePath), false);
       } else {
         deleteSuccess = new File(originFilePath).delete();
       }
@@ -849,9 +861,9 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     }
 
     private void readBufferBySize(ByteBuffer buffer, int toRead) throws 
IOException {
-      if (isHdfs) {
-        // HDFS does not need to prefetch.
-        hdfsOriginInput.seek(toRead + hdfsOriginInput.getPos());
+      if (isDfs) {
+        // DFS does not need to prefetch.
+        dfsOriginInput.seek(toRead + dfsOriginInput.getPos());
       } else if (prefetchEnabled) {
         buffer.clear();
         readChannelBySize(originFileChannel, buffer, originFilePath, toRead);
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java
index 20910ab88..0c1980e05 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java
@@ -88,17 +88,14 @@ public final class ReducePartitionDataWriter extends 
PartitionDataWriter {
             },
             () -> {
               if (diskFileInfo != null) {
-                if (diskFileInfo.isHdfs()) {
-                  if (StorageManager.hadoopFs()
-                      .exists(diskFileInfo.getHdfsPeerWriterSuccessPath())) {
-                    
StorageManager.hadoopFs().delete(diskFileInfo.getHdfsPath(), false);
+                if (diskFileInfo.isDFS()) {
+                  if 
(hadoopFs.exists(diskFileInfo.getDfsPeerWriterSuccessPath())) {
+                    hadoopFs.delete(diskFileInfo.getDfsPath(), false);
                     deleted = true;
                   } else {
-                    StorageManager.hadoopFs()
-                        .create(diskFileInfo.getHdfsWriterSuccessPath())
-                        .close();
+                    
hadoopFs.create(diskFileInfo.getDfsWriterSuccessPath()).close();
                     FSDataOutputStream indexOutputStream =
-                        
StorageManager.hadoopFs().create(diskFileInfo.getHdfsIndexPath());
+                        hadoopFs.create(diskFileInfo.getDfsIndexPath());
                     indexOutputStream.writeInt(
                         
(diskFileInfo.getReduceFileMeta()).getChunkOffsets().size());
                     for (Long offset : 
(diskFileInfo.getReduceFileMeta()).getChunkOffsets()) {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 8deac0905..e59014120 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -164,7 +164,7 @@ private[deploy] class Controller(
       return
     }
 
-    if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage) 
{
+    if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage 
&& !conf.hasS3Storage) {
       val msg = "Local storage has no available dirs!"
       logError(s"[handleReserveSlots] $msg")
       context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR, 
msg))
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 934a4e967..dc4f27bdf 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -274,6 +274,12 @@ class FetchHandler(
               shuffleKey,
               fileName)
             makeStreamHandler(streamId, numChunks = 0)
+          case info: DiskFileInfo if info.isS3 =>
+            chunkStreamManager.registerStream(
+              streamId,
+              shuffleKey,
+              fileName)
+            makeStreamHandler(streamId, numChunks = 0)
           case _ =>
             val managedBuffer = fileInfo match {
               case df: DiskFileInfo =>
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 0fe387f58..11b8871e9 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -45,7 +45,7 @@ import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.unsafe.Platform
 import org.apache.celeborn.common.util.{DiskUtils, ExceptionUtils, Utils}
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
-import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher, 
LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher, 
LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, S3Flusher, 
StorageManager}
 
 class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler with Logging {
 
@@ -1185,7 +1185,8 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
   }
 
   private def checkDiskFull(fileWriter: PartitionDataWriter): Boolean = {
-    if (fileWriter.flusher == null || 
fileWriter.flusher.isInstanceOf[HdfsFlusher]) {
+    if (fileWriter.flusher == null || fileWriter.flusher.isInstanceOf[
+        HdfsFlusher] || fileWriter.flusher.isInstanceOf[S3Flusher]) {
       return false
     }
     val mountPoint = fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 2f8767383..ae15dbc73 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -457,7 +457,7 @@ private[celeborn] class Worker(
     val diskInfos =
       workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { 
disk =>
         disk.mountPoint -> disk
-      }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo
+      }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo ++ 
storageManager.s3DiskInfo
     workerStatusManager.checkIfNeedTransitionStatus()
     val response = masterClient.askSync[HeartbeatFromWorkerResponse](
       HeartbeatFromWorker(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 607e47ddc..df0d63be3 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -21,6 +21,9 @@ import java.nio.channels.FileChannel
 
 import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.IOUtils
+
+import org.apache.celeborn.common.protocol.StorageInfo.Type
 
 abstract private[worker] class FlushTask(
     val buffer: CompositeByteBuf,
@@ -50,8 +53,38 @@ private[worker] class HdfsFlushTask(
     notifier: FlushNotifier,
     keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) {
   override def flush(): Unit = {
-    val hdfsStream = StorageManager.hadoopFs.append(path, 256 * 1024)
+    val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
+    val hdfsStream = hadoopFs.append(path, 256 * 1024)
     hdfsStream.write(ByteBufUtil.getBytes(buffer))
     hdfsStream.close()
   }
 }
+
+private[worker] class S3FlushTask(
+    buffer: CompositeByteBuf,
+    val path: Path,
+    notifier: FlushNotifier,
+    keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) {
+  override def flush(): Unit = {
+    val hadoopFs = StorageManager.hadoopFs.get(Type.S3)
+    if (hadoopFs.exists(path)) {
+      val conf = hadoopFs.getConf
+      val tempPath = new Path(path.getParent, path.getName + ".tmp")
+      val outputStream = hadoopFs.create(tempPath, true, 256 * 1024)
+      val inputStream = hadoopFs.open(path)
+      try {
+        IOUtils.copyBytes(inputStream, outputStream, conf, false)
+      } finally {
+        inputStream.close()
+      }
+      outputStream.write(ByteBufUtil.getBytes(buffer))
+      outputStream.close()
+      hadoopFs.delete(path, false)
+      hadoopFs.rename(tempPath, path)
+    } else {
+      val s3Stream = hadoopFs.create(path, true, 256 * 1024)
+      s3Stream.write(ByteBufUtil.getBytes(buffer))
+      s3Stream.close()
+    }
+  }
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index cfc94e962..30194aa83 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -198,3 +198,22 @@ final private[worker] class HdfsFlusher(
 
   override def toString: String = s"HdfsFlusher@$flusherId"
 }
+
+final private[worker] class S3Flusher(
+    workerSource: AbstractSource,
+    s3FlusherThreads: Int,
+    allocator: PooledByteBufAllocator,
+    maxComponents: Int) extends Flusher(
+    workerSource,
+    s3FlusherThreads,
+    allocator,
+    maxComponents,
+    null,
+    "S3") with Logging {
+
+  override def processIOException(e: IOException, deviceErrorType: 
DiskStatus): Unit = {
+    logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
+  }
+
+  override def toString: String = s"s3Flusher@$flusherId"
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 34579327a..1bfca9b64 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -57,6 +57,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   val workingDirWriters =
     JavaUtils.newConcurrentHashMap[File, ConcurrentHashMap[String, 
PartitionDataWriter]]()
   val hdfsWriters = JavaUtils.newConcurrentHashMap[String, 
PartitionDataWriter]()
+  val s3Writers = JavaUtils.newConcurrentHashMap[String, PartitionDataWriter]()
   val memoryWriters = JavaUtils.newConcurrentHashMap[MemoryFileInfo, 
PartitionDataWriter]()
   // (shuffleKey->(filename->DiskFileInfo))
   private val diskFileInfos =
@@ -67,6 +68,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
 
   val hasHDFSStorage = conf.hasHDFSStorage
 
+  val hasS3Storage = conf.hasS3Storage
+
   val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
   val storagePolicy = new StoragePolicy(conf, this, workerSource)
 
@@ -77,7 +80,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         (new File(workdir, conf.workerWorkingDir), maxSpace, flusherThread, 
storageType)
       }
 
-    if (workingDirInfos.size <= 0 && !hasHDFSStorage) {
+    if (workingDirInfos.size <= 0 && !hasHDFSStorage && !hasS3Storage) {
       throw new IOException("Empty working directory configuration!")
     }
 
@@ -89,6 +92,11 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       Option(new DiskInfo("HDFS", Long.MaxValue, 999999, 999999, 0, 
StorageInfo.Type.HDFS))
     else None
 
+  val s3DiskInfo =
+    if (conf.hasS3Storage)
+      Option(new DiskInfo("S3", Long.MaxValue, 999999, 999999, 0, 
StorageInfo.Type.S3))
+    else None
+
   def disksSnapshot(): List[DiskInfo] = {
     diskInfos.synchronized {
       val disks = new util.ArrayList[DiskInfo](diskInfos.values())
@@ -149,6 +157,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   deviceMonitor.startCheck()
 
   val hdfsDir = conf.hdfsDir
+  val s3Dir = conf.s3Dir
   val hdfsPermission = new FsPermission("755")
   val (hdfsFlusher, _totalHdfsFlusherThread) =
     if (hasHDFSStorage) {
@@ -171,12 +180,36 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       (None, 0)
     }
 
-  def totalFlusherThread: Int = _totalLocalFlusherThread + 
_totalHdfsFlusherThread
+  val (s3Flusher, _totalS3FlusherThread) =
+    if (hasS3Storage) {
+      logInfo(s"Initialize S3 support with path $s3Dir")
+      try {
+        StorageManager.hadoopFs = CelebornHadoopUtils.getHadoopFS(conf)
+      } catch {
+        case e: Exception =>
+          logError("Celeborn initialize S3 failed.", e)
+          throw e
+      }
+      (
+        Some(new S3Flusher(
+          workerSource,
+          conf.workerS3FlusherThreads,
+          storageBufferAllocator,
+          conf.workerPushMaxComponents)),
+        conf.workerS3FlusherThreads)
+    } else {
+      (None, 0)
+    }
+
+  def totalFlusherThread: Int =
+    _totalLocalFlusherThread + _totalHdfsFlusherThread + _totalS3FlusherThread
+
   val activeTypes = conf.availableStorageTypes
 
-  def localOrHdfsStorageAvailable(): Boolean = {
-    StorageInfo.HDFSAvailable(activeTypes) || StorageInfo.localDiskAvailable(
-      activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty
+  def localOrDfsStorageAvailable(): Boolean = {
+    StorageInfo.OSSAvailable(activeTypes) || StorageInfo.HDFSAvailable(
+      activeTypes) || StorageInfo.localDiskAvailable(
+      activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty
   }
 
   override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = 
this.synchronized {
@@ -383,7 +416,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       rangeReadFilter: Boolean,
       userIdentifier: UserIdentifier,
       partitionSplitEnabled: Boolean): PartitionDataWriter = {
-    if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage) {
+    if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage) {
       throw new IOException("No available working dirs!")
     }
     val partitionDataWriterContext = new PartitionDataWriterContext(
@@ -445,6 +478,10 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       hdfsWriters.put(fileInfo.getFilePath, writer)
       return
     }
+    if (writer.getDiskFileInfo.isS3) {
+      s3Writers.put(fileInfo.getFilePath, writer)
+      return
+    }
     deviceMonitor.registerFileWriter(writer)
     workingDirWriters.computeIfAbsent(workingDir, 
workingDirWriterListFunc).put(
       fileInfo.getFilePath,
@@ -506,17 +543,25 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
 
   def cleanFileInternal(shuffleKey: String, fileInfo: FileInfo): Boolean = {
     if (fileInfo == null) return false
-    var isHdfsExpired = false
+    var isDfsExpired = false
     fileInfo match {
       case info: DiskFileInfo =>
         if (info.isHdfs) {
-          isHdfsExpired = true
+          isDfsExpired = true
           val hdfsFileWriter = hdfsWriters.get(info.getFilePath)
           if (hdfsFileWriter != null) {
             hdfsFileWriter.destroy(new IOException(
               s"Destroy FileWriter $hdfsFileWriter caused by shuffle 
$shuffleKey expired."))
             hdfsWriters.remove(info.getFilePath)
           }
+        } else if (info.isS3) {
+          isDfsExpired = true
+          val s3FileWriter = s3Writers.get(info.getFilePath)
+          if (s3FileWriter != null) {
+            s3FileWriter.destroy(new IOException(
+              s"Destroy FileWriter $s3FileWriter caused by shuffle $shuffleKey 
expired."))
+            s3Writers.remove(info.getFilePath)
+          }
         } else {
           val workingDir =
             info.getFile.getParentFile.getParentFile.getParentFile
@@ -537,7 +582,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       case _ =>
     }
 
-    isHdfsExpired
+    isDfsExpired
   }
 
   def cleanupExpiredShuffleKey(
@@ -547,12 +592,14 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       logInfo(s"Cleanup expired shuffle $shuffleKey.")
       if (diskFileInfos.containsKey(shuffleKey)) {
         val removedFileInfos = diskFileInfos.remove(shuffleKey)
-        var isHdfsExpired = false
+        var isDfsExpired = false
+        var isHdfs = false
         if (removedFileInfos != null) {
           removedFileInfos.asScala.foreach {
             case (_, fileInfo) =>
               if (cleanFileInternal(shuffleKey, fileInfo)) {
-                isHdfsExpired = true
+                isDfsExpired = true
+                isHdfs = fileInfo.isHdfs
               }
           }
         }
@@ -565,13 +612,16 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
             deleteDirectory(file, diskOperators.get(diskInfo.mountPoint))
           }
         }
-        if (isHdfsExpired) {
+        if (isDfsExpired) {
           try {
-            StorageManager.hadoopFs.delete(
-              new Path(new Path(hdfsDir, conf.workerWorkingDir), 
s"$appId/$shuffleId"),
+            val dir = if (hasHDFSStorage && isHdfs) hdfsDir else s3Dir
+            val storageInfo =
+              if (hasHDFSStorage && isHdfs) StorageInfo.Type.HDFS else 
StorageInfo.Type.S3
+            StorageManager.hadoopFs.get(storageInfo).delete(
+              new Path(new Path(dir, conf.workerWorkingDir), 
s"$appId/$shuffleId"),
               true)
           } catch {
-            case e: Exception => logWarning("Clean expired HDFS shuffle 
failed.", e)
+            case e: Exception => logWarning("Clean expired DFS shuffle 
failed.", e)
           }
         }
         if (workerGracefulShutdown) {
@@ -671,12 +721,13 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           }
         }
 
-      val hdfsCleaned = hadoopFs match {
-        case hdfs: FileSystem =>
-          val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir)
-          // HDFS path not exist when first time initialize
-          if (hdfs.exists(hdfsWorkPath)) {
-            !hdfs.listFiles(hdfsWorkPath, false).hasNext
+      val dfsCleaned = hadoopFs match {
+        case dfs: FileSystem =>
+          val dfsDir = if (hasHDFSStorage) hdfsDir else s3Dir
+          val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir)
+          // DFS path not exist when first time initialize
+          if (dfs.exists(dfsWorkPath)) {
+            !dfs.listFiles(dfsWorkPath, false).hasNext
           } else {
             true
           }
@@ -684,7 +735,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           true
       }
 
-      if (localCleaned && hdfsCleaned) {
+      if (localCleaned && dfsCleaned) {
         return true
       }
       retryTimes += 1
@@ -763,6 +814,11 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         u.flushOnMemoryPressure()
       }
     })
+    s3Writers.forEach(new BiConsumer[String, PartitionDataWriter] {
+      override def accept(t: String, u: PartitionDataWriter): Unit = {
+        u.flushOnMemoryPressure()
+      }
+    })
   }
 
   override def onPause(moduleName: String): Unit = {}
@@ -838,7 +894,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       fileInfos: List[DiskFileInfo],
       subResourceConsumptions: util.Map[String, ResourceConsumption] = null)
       : ResourceConsumption = {
-    val diskFileInfos = fileInfos.filter(!_.isHdfs)
+    val diskFileInfos = fileInfos.filter(!_.isDFS)
     val hdfsFileInfos = fileInfos.filter(_.isHdfs)
     ResourceConsumption(
       diskFileInfos.map(_.getFileLength).sum,
@@ -884,7 +940,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         null,
         null,
         null)
-    } else if (location.getStorageInfo.localDiskAvailable() || 
location.getStorageInfo.HDFSAvailable()) {
+    } else if (location.getStorageInfo.localDiskAvailable() || 
location.getStorageInfo.HDFSAvailable() || 
location.getStorageInfo.S3Available()) {
       logDebug(s"create non-memory file for 
${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
       val createDiskFileResult = createDiskFile(
         location,
@@ -953,14 +1009,17 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
             s" working dirs. diskInfo $diskInfo")
           healthyWorkingDirs()
         }
-      if (dirs.isEmpty && hdfsFlusher.isEmpty) {
+      if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty) {
         throw new IOException(s"No available disks! suggested mountPoint 
$suggestedMountPoint")
       }
 
       if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
         val shuffleDir =
           new Path(new Path(hdfsDir, conf.workerWorkingDir), 
s"$appId/$shuffleId")
-        FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
+        FileSystem.mkdirs(
+          StorageManager.hadoopFs.get(StorageInfo.Type.HDFS),
+          shuffleDir,
+          hdfsPermission)
         val hdfsFilePath = new Path(shuffleDir, fileName).toString
         val hdfsFileInfo = new DiskFileInfo(
           userIdentifier,
@@ -972,6 +1031,24 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           fileName,
           hdfsFileInfo)
         return (hdfsFlusher.get, hdfsFileInfo, null)
+      } else if (dirs.isEmpty && location.getStorageInfo.S3Available()) {
+        val shuffleDir =
+          new Path(new Path(s3Dir, conf.workerWorkingDir), 
s"$appId/$shuffleId")
+        FileSystem.mkdirs(
+          StorageManager.hadoopFs.get(StorageInfo.Type.S3),
+          shuffleDir,
+          hdfsPermission)
+        val s3FilePath = new Path(shuffleDir, fileName).toString
+        val s3FileInfo = new DiskFileInfo(
+          userIdentifier,
+          partitionSplitEnabled,
+          new ReduceFileMeta(conf.shuffleChunkSize),
+          s3FilePath,
+          StorageInfo.Type.S3)
+        diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
+          fileName,
+          s3FileInfo)
+        return (s3Flusher.get, s3FileInfo, null)
       } else if (dirs.nonEmpty && 
location.getStorageInfo.localDiskAvailable()) {
         val dir = dirs(getNextIndex % dirs.size)
         val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, 
mountPoints)
@@ -1037,5 +1114,5 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
 }
 
 object StorageManager {
-  var hadoopFs: FileSystem = _
+  var hadoopFs: util.Map[StorageInfo.Type, FileSystem] = _
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 228063d81..20c56ee8c 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -61,7 +61,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
               partitionDataWriterContext.isPartitionSplitEnabled)
             partitionDataWriterContext.setStorageType(storageInfoType)
             new CelebornMemoryFile(conf, source, memoryFileInfo, 
storageInfoType)
-          case StorageInfo.Type.HDD | StorageInfo.Type.SSD | 
StorageInfo.Type.HDFS | StorageInfo.Type.OSS =>
+          case StorageInfo.Type.HDD | StorageInfo.Type.SSD | 
StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
             logDebug(s"create non-memory file for 
${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
             val (flusher, diskFileInfo, workingDir) = 
storageManager.createDiskFile(
               location,
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index cf71200dc..e2d9f6a41 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -110,7 +110,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
     storageManager = Mockito.mock(StorageManager.class);
     AtomicLong evictCount = new AtomicLong();
     Mockito.when(storageManager.evictedFileCount()).thenAnswer(a -> 
evictCount);
-    Mockito.when(storageManager.localOrHdfsStorageAvailable()).thenAnswer(a -> 
true);
+    Mockito.when(storageManager.localOrDfsStorageAvailable()).thenAnswer(a -> 
true);
     Mockito.when(storageManager.storageBufferAllocator()).thenAnswer(a -> 
allocator);
     MemoryManager.initialize(conf, storageManager);
   }

Reply via email to