This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new dfac04f6e [VL] Support celeborn sort based shuffle (#5675)
dfac04f6e is described below

commit dfac04f6e9f471a67248a2b9a2e582c9d6f22597
Author: Kerwin Zhang <xiyu...@alibaba-inc.com>
AuthorDate: Fri May 17 13:44:54 2024 +0800

    [VL] Support celeborn sort based shuffle (#5675)
---
 cpp/core/jni/JniCommon.h                           |  18 +-
 cpp/core/jni/JniWrapper.cc                         |  30 +-
 cpp/core/shuffle/FallbackRangePartitioner.cc       |  18 +
 cpp/core/shuffle/FallbackRangePartitioner.h        |   6 +
 cpp/core/shuffle/HashPartitioner.cc                |  49 ++-
 cpp/core/shuffle/HashPartitioner.h                 |   6 +
 cpp/core/shuffle/LocalPartitionWriter.cc           |   4 +
 cpp/core/shuffle/LocalPartitionWriter.h            |   2 +
 cpp/core/shuffle/Options.h                         |  16 +-
 cpp/core/shuffle/PartitionWriter.h                 |   6 +
 cpp/core/shuffle/Partitioner.h                     |   9 +
 cpp/core/shuffle/RoundRobinPartitioner.cc          |  16 +
 cpp/core/shuffle/RoundRobinPartitioner.h           |   6 +
 cpp/core/shuffle/ShuffleReader.cc                  |   4 +
 cpp/core/shuffle/ShuffleReader.h                   |   6 +-
 cpp/core/shuffle/ShuffleWriter.h                   |  12 +-
 cpp/core/shuffle/SinglePartitioner.cc              |   9 +
 cpp/core/shuffle/SinglePartitioner.h               |   6 +
 cpp/core/shuffle/rss/RssClient.h                   |   2 +-
 cpp/core/shuffle/rss/RssPartitionWriter.cc         |   7 +
 cpp/core/shuffle/rss/RssPartitionWriter.h          |   2 +
 cpp/velox/CMakeLists.txt                           |   3 +-
 cpp/velox/benchmarks/GenericBenchmark.cc           |   5 +-
 cpp/velox/benchmarks/ShuffleSplitBenchmark.cc      |   9 +-
 cpp/velox/compute/VeloxRuntime.cc                  |  33 +-
 ...fleWriter.cc => VeloxHashBasedShuffleWriter.cc} | 161 +++-----
 ...uffleWriter.h => VeloxHashBasedShuffleWriter.h} |  92 +----
 cpp/velox/shuffle/VeloxShuffleReader.cc            | 115 +++++-
 cpp/velox/shuffle/VeloxShuffleReader.h             |  51 ++-
 cpp/velox/shuffle/VeloxShuffleWriter.h             | 405 ++++-----------------
 cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc   | 317 ++++++++++++++++
 cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h    | 117 ++++++
 cpp/velox/tests/VeloxShuffleWriterTest.cc          |  24 +-
 cpp/velox/utils/tests/LocalRssClient.h             |   2 +-
 cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 102 ++++--
 .../gluten/celeborn/CelebornShuffleManager.java    |   5 -
 .../CelebornHashBasedColumnarShuffleWriter.scala   |   8 +
 .../VeloxCelebornColumnarBatchSerializer.scala     |   6 +-
 ...loxCelebornHashBasedColumnarShuffleWriter.scala |  10 +-
 .../gluten/vectorized/ShuffleReaderJniWrapper.java |   3 +-
 .../gluten/vectorized/ShuffleWriterJniWrapper.java |  16 +-
 .../vectorized/ColumnarBatchSerializer.scala       |   4 +-
 .../spark/shuffle/ColumnarShuffleWriter.scala      |   2 +-
 .../writer/VeloxUniffleColumnarShuffleWriter.java  |   6 +-
 .../scala/org/apache/gluten/GlutenConfig.scala     |   5 +
 45 files changed, 1104 insertions(+), 631 deletions(-)

diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index 29c38689c..aa3b2b884 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -280,6 +280,20 @@ static inline arrow::Compression::type 
getCompressionType(JNIEnv* env, jstring c
   return compressionType;
 }
 
+static inline const std::string getCompressionTypeStr(JNIEnv* env, jstring 
codecJstr) {
+  if (codecJstr == NULL) {
+    return "none";
+  }
+  auto codec = env->GetStringUTFChars(codecJstr, JNI_FALSE);
+
+  // Convert codec string into lowercase.
+  std::string codecLower;
+  std::transform(codec, codec + std::strlen(codec), 
std::back_inserter(codecLower), ::tolower);
+
+  env->ReleaseStringUTFChars(codecJstr, codec);
+  return codecLower;
+}
+
 static inline gluten::CodecBackend getCodecBackend(JNIEnv* env, jstring 
codecJstr) {
   if (codecJstr == nullptr) {
     return gluten::CodecBackend::NONE;
@@ -444,7 +458,7 @@ class JavaRssClient : public RssClient {
     env->DeleteGlobalRef(array_);
   }
 
-  int32_t pushPartitionData(int32_t partitionId, char* bytes, int64_t size) 
override {
+  int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t 
size) override {
     JNIEnv* env;
     if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
       throw gluten::GlutenException("JNIEnv was not attached to current 
thread");
@@ -457,7 +471,7 @@ class JavaRssClient : public RssClient {
       array_ = env->NewByteArray(size);
       array_ = static_cast<jbyteArray>(env->NewGlobalRef(array_));
     }
-    env->SetByteArrayRegion(array_, 0, size, reinterpret_cast<jbyte*>(bytes));
+    env->SetByteArrayRegion(array_, 0, size, (jbyte*)bytes);
     jint javaBytesSize = env->CallIntMethod(javaRssShuffleWriter_, 
javaPushPartitionData_, partitionId, array_, size);
     checkException(env);
     return static_cast<int32_t>(javaBytesSize);
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 7363a9da0..e70a017e0 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -831,8 +831,10 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
     jlong taskAttemptId,
     jint startPartitionId,
     jint pushBufferMaxSize,
+    jlong sortBufferMaxSize,
     jobject partitionPusher,
-    jstring partitionWriterTypeJstr) {
+    jstring partitionWriterTypeJstr,
+    jstring shuffleWriterTypeJstr) {
   JNI_METHOD_START
   auto ctx = gluten::getRuntime(env, wrapper);
   auto memoryManager = jniCastOrThrow<MemoryManager>(memoryManagerHandle);
@@ -866,10 +868,12 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
       .mergeThreshold = mergeThreshold,
       .compressionThreshold = compressionThreshold,
       .compressionType = getCompressionType(env, codecJstr),
+      .compressionTypeStr = getCompressionTypeStr(env, codecJstr),
       .compressionLevel = compressionLevel,
       .bufferedWrite = true,
       .numSubDirs = numSubDirs,
-      .pushBufferMaxSize = pushBufferMaxSize > 0 ? pushBufferMaxSize : 
kDefaultShuffleWriterBufferSize};
+      .pushBufferMaxSize = pushBufferMaxSize > 0 ? pushBufferMaxSize : 
kDefaultPushMemoryThreshold,
+      .sortBufferMaxSize = sortBufferMaxSize > 0 ? sortBufferMaxSize : 
kDefaultSortBufferThreshold};
   if (codecJstr != NULL) {
     partitionWriterOptions.codecBackend = getCodecBackend(env, 
codecBackendJstr);
     partitionWriterOptions.compressionMode = getCompressionMode(env, 
compressionModeJstr);
@@ -879,6 +883,15 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
   auto partitionWriterTypeC = env->GetStringUTFChars(partitionWriterTypeJstr, 
JNI_FALSE);
   auto partitionWriterType = std::string(partitionWriterTypeC);
   env->ReleaseStringUTFChars(partitionWriterTypeJstr, partitionWriterTypeC);
+
+  auto shuffleWriterTypeC = env->GetStringUTFChars(shuffleWriterTypeJstr, 
JNI_FALSE);
+  auto shuffleWriterType = std::string(shuffleWriterTypeC);
+  env->ReleaseStringUTFChars(shuffleWriterTypeJstr, shuffleWriterTypeC);
+
+  if (shuffleWriterType == "sort") {
+    shuffleWriterOptions.shuffleWriterType = kSortShuffle;
+  }
+
   if (partitionWriterType == "local") {
     if (dataFileJstr == NULL) {
       throw gluten::GlutenException(std::string("Shuffle DataFile can't be 
null"));
@@ -962,7 +975,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
   JNI_METHOD_END(kInvalidResourceHandle)
 }
 
-JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_split( // NOLINT
+JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_write( // NOLINT
     JNIEnv* env,
     jobject wrapper,
     jlong shuffleWriterHandle,
@@ -981,7 +994,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
   // The column batch maybe VeloxColumnBatch or 
ArrowCStructColumnarBatch(FallbackRangeShuffleWriter)
   auto batch = ctx->objectStore()->retrieve<ColumnarBatch>(batchHandle);
   auto numBytes = batch->numBytes();
-  gluten::arrowAssertOkOrThrow(shuffleWriter->split(batch, memLimit), "Native 
split: shuffle writer split failed");
+  gluten::arrowAssertOkOrThrow(shuffleWriter->write(batch, memLimit), "Native 
write: shuffle writer failed");
   return numBytes;
   JNI_METHOD_END(kInvalidResourceHandle)
 }
@@ -1058,7 +1071,8 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
     jlong memoryManagerHandle,
     jstring compressionType,
     jstring compressionBackend,
-    jint batchSize) {
+    jint batchSize,
+    jstring shuffleWriterType) {
   JNI_METHOD_START
   auto ctx = gluten::getRuntime(env, wrapper);
   auto memoryManager = jniCastOrThrow<MemoryManager>(memoryManagerHandle);
@@ -1066,11 +1080,16 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
   auto pool = memoryManager->getArrowMemoryPool();
   ShuffleReaderOptions options = ShuffleReaderOptions{};
   options.compressionType = getCompressionType(env, compressionType);
+  options.compressionTypeStr = getCompressionTypeStr(env, compressionType);
   if (compressionType != nullptr) {
     options.codecBackend = getCodecBackend(env, compressionBackend);
   }
   options.batchSize = batchSize;
   // TODO: Add coalesce option and maximum coalesced size.
+
+  if (jStringToCString(env, shuffleWriterType) == "sort") {
+    options.shuffleWriterType = kSortShuffle;
+  }
   std::shared_ptr<arrow::Schema> schema =
       gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct 
ArrowSchema*>(cSchema)));
 
@@ -1085,7 +1104,6 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
     jobject jniIn) {
   JNI_METHOD_START
   auto ctx = gluten::getRuntime(env, wrapper);
-
   auto reader = 
ctx->objectStore()->retrieve<ShuffleReader>(shuffleReaderHandle);
   std::shared_ptr<arrow::io::InputStream> in = 
std::make_shared<JavaInputStreamAdaptor>(env, reader->getPool(), jniIn);
   auto outItr = reader->readStream(in);
diff --git a/cpp/core/shuffle/FallbackRangePartitioner.cc 
b/cpp/core/shuffle/FallbackRangePartitioner.cc
index 4bad50b51..677fcd114 100644
--- a/cpp/core/shuffle/FallbackRangePartitioner.cc
+++ b/cpp/core/shuffle/FallbackRangePartitioner.cc
@@ -39,4 +39,22 @@ arrow::Status gluten::FallbackRangePartitioner::compute(
   return arrow::Status::OK();
 }
 
+arrow::Status gluten::FallbackRangePartitioner::compute(
+    const int32_t* pidArr,
+    const int64_t numRows,
+    const int32_t vectorIndex,
+    std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) {
+  auto index = static_cast<int64_t>(vectorIndex) << 32;
+  for (auto i = 0; i < numRows; ++i) {
+    auto pid = pidArr[i];
+    int64_t combined = index | (i & 0xFFFFFFFFLL);
+    auto& vec = rowVectorIndexMap[pid];
+    vec.push_back(combined);
+    if (pid >= numPartitions_) {
+      return arrow::Status::Invalid(
+          "Partition id ", std::to_string(pid), " is equal or greater than ", 
std::to_string(numPartitions_));
+    }
+  }
+  return arrow::Status::OK();
+}
 } // namespace gluten
diff --git a/cpp/core/shuffle/FallbackRangePartitioner.h 
b/cpp/core/shuffle/FallbackRangePartitioner.h
index f54dd1abc..b06ce7e17 100644
--- a/cpp/core/shuffle/FallbackRangePartitioner.h
+++ b/cpp/core/shuffle/FallbackRangePartitioner.h
@@ -30,6 +30,12 @@ class FallbackRangePartitioner final : public Partitioner {
       const int64_t numRows,
       std::vector<uint32_t>& row2partition,
       std::vector<uint32_t>& partition2RowCount) override;
+
+  arrow::Status compute(
+      const int32_t* pidArr,
+      const int64_t numRows,
+      const int32_t vectorIndex,
+      std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) 
override;
 };
 
 } // namespace gluten
diff --git a/cpp/core/shuffle/HashPartitioner.cc 
b/cpp/core/shuffle/HashPartitioner.cc
index c62e3185f..4a26dc67b 100644
--- a/cpp/core/shuffle/HashPartitioner.cc
+++ b/cpp/core/shuffle/HashPartitioner.cc
@@ -19,6 +19,24 @@
 
 namespace gluten {
 
+int32_t computePid(const int32_t* pidArr, int64_t i, int32_t numPartitions) {
+  auto pid = pidArr[i] % numPartitions;
+#if defined(__x86_64__)
+  // force to generate ASM
+  __asm__(
+      "lea (%[num_partitions],%[pid],1),%[tmp]\n"
+      "test %[pid],%[pid]\n"
+      "cmovs %[tmp],%[pid]\n"
+      : [pid] "+r"(pid)
+      : [num_partitions] "r"(numPartitions), [tmp] "r"(0));
+#else
+  if (pid < 0) {
+    pid += numPartitions_;
+  }
+#endif
+  return pid;
+}
+
 arrow::Status gluten::HashPartitioner::compute(
     const int32_t* pidArr,
     const int64_t numRows,
@@ -28,20 +46,7 @@ arrow::Status gluten::HashPartitioner::compute(
   std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);
 
   for (auto i = 0; i < numRows; ++i) {
-    auto pid = pidArr[i] % numPartitions_;
-#if defined(__x86_64__)
-    // force to generate ASM
-    __asm__(
-        "lea (%[num_partitions],%[pid],1),%[tmp]\n"
-        "test %[pid],%[pid]\n"
-        "cmovs %[tmp],%[pid]\n"
-        : [pid] "+r"(pid)
-        : [num_partitions] "r"(numPartitions_), [tmp] "r"(0));
-#else
-    if (pid < 0) {
-      pid += numPartitions_;
-    }
-#endif
+    auto pid = computePid(pidArr, i, numPartitions_);
     row2partition[i] = pid;
   }
 
@@ -52,4 +57,20 @@ arrow::Status gluten::HashPartitioner::compute(
   return arrow::Status::OK();
 }
 
+arrow::Status gluten::HashPartitioner::compute(
+    const int32_t* pidArr,
+    const int64_t numRows,
+    const int32_t vectorIndex,
+    std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) {
+  auto index = static_cast<int64_t>(vectorIndex) << 32;
+  for (auto i = 0; i < numRows; ++i) {
+    auto pid = computePid(pidArr, i, numPartitions_);
+    int64_t combined = index | (i & 0xFFFFFFFFLL);
+    auto& vec = rowVectorIndexMap[pid];
+    vec.push_back(combined);
+  }
+
+  return arrow::Status::OK();
+}
+
 } // namespace gluten
diff --git a/cpp/core/shuffle/HashPartitioner.h 
b/cpp/core/shuffle/HashPartitioner.h
index fff01f939..6cd664634 100644
--- a/cpp/core/shuffle/HashPartitioner.h
+++ b/cpp/core/shuffle/HashPartitioner.h
@@ -30,6 +30,12 @@ class HashPartitioner final : public Partitioner {
       const int64_t numRows,
       std::vector<uint32_t>& row2partition,
       std::vector<uint32_t>& partition2RowCount) override;
+
+  arrow::Status compute(
+      const int32_t* pidArr,
+      const int64_t numRows,
+      const int32_t vectorIndex,
+      std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) 
override;
 };
 
 } // namespace gluten
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc 
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 0582ce0e5..2fa0b954f 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -541,6 +541,10 @@ arrow::Status LocalPartitionWriter::evict(
   return arrow::Status::OK();
 }
 
+arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, int64_t 
rawSize, const char* data, int64_t length) {
+  return arrow::Status::NotImplemented("Invalid code path for local shuffle 
writer: sort based is not supported.");
+}
+
 arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* 
actual) {
   // Finish last spiller.
   RETURN_NOT_OK(finishSpill());
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h 
b/cpp/core/shuffle/LocalPartitionWriter.h
index 2cf4f2fd9..c2bfacd4b 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -42,6 +42,8 @@ class LocalPartitionWriter : public PartitionWriter {
       bool reuseBuffers,
       bool hasComplexType) override;
 
+  arrow::Status evict(uint32_t partitionId, int64_t rawSize, const char* data, 
int64_t length) override;
+
   /// The stop function performs several tasks:
   /// 1. Opens the final data file.
   /// 2. Iterates over each partition ID (pid) to:
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index d8fe1c802..4317ed631 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -25,18 +25,24 @@
 namespace gluten {
 
 static constexpr int16_t kDefaultBatchSize = 4096;
-static constexpr int16_t kDefaultShuffleWriterBufferSize = 4096;
+static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
+static constexpr int64_t kDefaultSortBufferThreshold = 64000000000;
+static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
 static constexpr int32_t kDefaultNumSubDirs = 64;
 static constexpr int32_t kDefaultCompressionThreshold = 100;
+static const std::string kDefaultCompressionTypeStr = "lz4";
 static constexpr int32_t kDefaultBufferAlignment = 64;
 static constexpr double kDefaultBufferReallocThreshold = 0.25;
 static constexpr double kDefaultMergeBufferThreshold = 0.25;
 static constexpr bool kEnableBufferedWrite = true;
 
+enum ShuffleWriterType { kHashShuffle, kSortShuffle };
 enum PartitionWriterType { kLocal, kRss };
 
 struct ShuffleReaderOptions {
   arrow::Compression::type compressionType = 
arrow::Compression::type::LZ4_FRAME;
+  std::string compressionTypeStr = "lz4";
+  ShuffleWriterType shuffleWriterType = kHashShuffle;
   CodecBackend codecBackend = CodecBackend::NONE;
   int32_t batchSize = kDefaultBatchSize;
 };
@@ -44,18 +50,20 @@ struct ShuffleReaderOptions {
 struct ShuffleWriterOptions {
   int32_t bufferSize = kDefaultShuffleWriterBufferSize;
   double bufferReallocThreshold = kDefaultBufferReallocThreshold;
+  int64_t pushMemoryThreshold = kDefaultPushMemoryThreshold;
   Partitioning partitioning = Partitioning::kRoundRobin;
   int64_t taskAttemptId = -1;
   int32_t startPartitionId = 0;
   int64_t threadId = -1;
+  ShuffleWriterType shuffleWriterType = kHashShuffle;
 };
 
 struct PartitionWriterOptions {
   int32_t mergeBufferSize = kDefaultShuffleWriterBufferSize;
   double mergeThreshold = kDefaultMergeBufferThreshold;
-
   int32_t compressionThreshold = kDefaultCompressionThreshold;
   arrow::Compression::type compressionType = arrow::Compression::LZ4_FRAME;
+  std::string compressionTypeStr = kDefaultCompressionTypeStr;
   CodecBackend codecBackend = CodecBackend::NONE;
   int32_t compressionLevel = arrow::util::kUseDefaultCompressionLevel;
   CompressionMode compressionMode = CompressionMode::BUFFER;
@@ -64,7 +72,9 @@ struct PartitionWriterOptions {
 
   int32_t numSubDirs = kDefaultNumSubDirs;
 
-  int32_t pushBufferMaxSize = kDefaultShuffleWriterBufferSize;
+  int64_t pushBufferMaxSize = kDefaultPushMemoryThreshold;
+
+  int64_t sortBufferMaxSize = kDefaultSortBufferThreshold;
 };
 
 struct ShuffleWriterMetrics {
diff --git a/cpp/core/shuffle/PartitionWriter.h 
b/cpp/core/shuffle/PartitionWriter.h
index 42e97cf06..93a6d04fe 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -49,10 +49,16 @@ class PartitionWriter : public Reclaimable {
       bool reuseBuffers,
       bool hasComplexType) = 0;
 
+  virtual arrow::Status evict(uint32_t partitionId, int64_t rawSize, const 
char* data, int64_t length) = 0;
+
   uint64_t cachedPayloadSize() {
     return payloadPool_->bytes_allocated();
   }
 
+  PartitionWriterOptions& options() {
+    return options_;
+  }
+
  protected:
   uint32_t numPartitions_;
   PartitionWriterOptions options_;
diff --git a/cpp/core/shuffle/Partitioner.h b/cpp/core/shuffle/Partitioner.h
index 8331b8a91..b233f5b82 100644
--- a/cpp/core/shuffle/Partitioner.h
+++ b/cpp/core/shuffle/Partitioner.h
@@ -18,7 +18,10 @@
 #pragma once
 
 #include <arrow/result.h>
+#include <folly/container/F14Map.h>
+
 #include <memory>
+#include <unordered_map>
 #include <vector>
 #include "shuffle/Partitioning.h"
 
@@ -40,6 +43,12 @@ class Partitioner {
       std::vector<uint32_t>& row2partition,
       std::vector<uint32_t>& partition2RowCount) = 0;
 
+  virtual arrow::Status compute(
+      const int32_t* pidArr,
+      const int64_t numRows,
+      const int32_t vectorIndex,
+      std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) = 
0;
+
  protected:
   Partitioner(int32_t numPartitions, bool hasPid) : 
numPartitions_(numPartitions), hasPid_(hasPid) {}
 
diff --git a/cpp/core/shuffle/RoundRobinPartitioner.cc 
b/cpp/core/shuffle/RoundRobinPartitioner.cc
index b00680a18..196f9308d 100644
--- a/cpp/core/shuffle/RoundRobinPartitioner.cc
+++ b/cpp/core/shuffle/RoundRobinPartitioner.cc
@@ -39,4 +39,20 @@ arrow::Status gluten::RoundRobinPartitioner::compute(
   return arrow::Status::OK();
 }
 
+arrow::Status gluten::RoundRobinPartitioner::compute(
+    const int32_t* pidArr,
+    const int64_t numRows,
+    const int32_t vectorIndex,
+    std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) {
+  auto index = static_cast<int64_t>(vectorIndex) << 32;
+  for (int32_t i = 0; i < numRows; ++i) {
+    int64_t combined = index | (i & 0xFFFFFFFFLL);
+    auto& vec = rowVectorIndexMap[pidSelection_];
+    vec.push_back(combined);
+    pidSelection_ = (pidSelection_ + 1) % numPartitions_;
+  }
+
+  return arrow::Status::OK();
+}
+
 } // namespace gluten
diff --git a/cpp/core/shuffle/RoundRobinPartitioner.h 
b/cpp/core/shuffle/RoundRobinPartitioner.h
index 5afd2832a..126a08eb9 100644
--- a/cpp/core/shuffle/RoundRobinPartitioner.h
+++ b/cpp/core/shuffle/RoundRobinPartitioner.h
@@ -32,6 +32,12 @@ class RoundRobinPartitioner final : public Partitioner {
       std::vector<uint32_t>& row2Partition,
       std::vector<uint32_t>& partition2RowCount) override;
 
+  arrow::Status compute(
+      const int32_t* pidArr,
+      const int64_t numRows,
+      const int32_t vectorIndex,
+      std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) 
override;
+
  private:
   friend class RoundRobinPartitionerTest;
 
diff --git a/cpp/core/shuffle/ShuffleReader.cc 
b/cpp/core/shuffle/ShuffleReader.cc
index 471409d6d..faa81b522 100644
--- a/cpp/core/shuffle/ShuffleReader.cc
+++ b/cpp/core/shuffle/ShuffleReader.cc
@@ -48,6 +48,10 @@ int64_t ShuffleReader::getIpcTime() const {
   return ipcTime_;
 }
 
+ShuffleWriterType ShuffleReader::getShuffleWriterType() const {
+  return factory_->getShuffleWriterType();
+}
+
 int64_t ShuffleReader::getDeserializeTime() const {
   return factory_->getDeserializeTime();
 }
diff --git a/cpp/core/shuffle/ShuffleReader.h b/cpp/core/shuffle/ShuffleReader.h
index 4ba105712..5cef14768 100644
--- a/cpp/core/shuffle/ShuffleReader.h
+++ b/cpp/core/shuffle/ShuffleReader.h
@@ -39,6 +39,8 @@ class DeserializerFactory {
   virtual int64_t getDecompressTime() = 0;
 
   virtual int64_t getDeserializeTime() = 0;
+
+  virtual ShuffleWriterType getShuffleWriterType() = 0;
 };
 
 class ShuffleReader {
@@ -60,13 +62,15 @@ class ShuffleReader {
 
   arrow::MemoryPool* getPool() const;
 
+  ShuffleWriterType getShuffleWriterType() const;
+
  protected:
   arrow::MemoryPool* pool_;
   int64_t decompressTime_ = 0;
   int64_t ipcTime_ = 0;
   int64_t deserializeTime_ = 0;
 
-  ShuffleReaderOptions options_;
+  ShuffleWriterType shuffleWriterType_;
 
  private:
   std::shared_ptr<arrow::Schema> schema_;
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index bcf0c2c3b..a7987ce3e 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -37,7 +37,7 @@ class ShuffleWriter : public Reclaimable {
  public:
   static constexpr int64_t kMinMemLimit = 128LL * 1024 * 1024;
 
-  virtual arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t 
memLimit) = 0;
+  virtual arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t 
memLimit) = 0;
 
   virtual arrow::Status stop() = 0;
 
@@ -45,6 +45,10 @@ class ShuffleWriter : public Reclaimable {
     return numPartitions_;
   }
 
+  ShuffleWriterOptions& options() {
+    return options_;
+  }
+
   int64_t partitionBufferSize() const {
     return partitionBufferPool_->bytes_allocated();
   }
@@ -81,7 +85,9 @@ class ShuffleWriter : public Reclaimable {
     return metrics_.rawPartitionLengths;
   }
 
-  virtual const uint64_t cachedPayloadSize() const = 0;
+  const int64_t rawPartitionBytes() {
+    return std::accumulate(metrics_.rawPartitionLengths.begin(), 
metrics_.rawPartitionLengths.end(), 0LL);
+  }
 
  protected:
   ShuffleWriter(
@@ -108,6 +114,8 @@ class ShuffleWriter : public Reclaimable {
 
   std::unique_ptr<PartitionWriter> partitionWriter_;
 
+  std::vector<int64_t> rowVectorLengths_;
+
   std::shared_ptr<arrow::Schema> schema_;
 
   // Column index, partition id, buffers.
diff --git a/cpp/core/shuffle/SinglePartitioner.cc 
b/cpp/core/shuffle/SinglePartitioner.cc
index c4f80ce79..981a5b8e4 100644
--- a/cpp/core/shuffle/SinglePartitioner.cc
+++ b/cpp/core/shuffle/SinglePartitioner.cc
@@ -28,4 +28,13 @@ arrow::Status gluten::SinglePartitioner::compute(
   return arrow::Status::OK();
 }
 
+arrow::Status gluten::SinglePartitioner::compute(
+    const int32_t* pidArr,
+    const int64_t numRows,
+    const int32_t vectorIndex,
+    std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) {
+  // nothing is need do here
+  return arrow::Status::OK();
+}
+
 } // namespace gluten
diff --git a/cpp/core/shuffle/SinglePartitioner.h 
b/cpp/core/shuffle/SinglePartitioner.h
index d3d2c29f7..e5d7a920f 100644
--- a/cpp/core/shuffle/SinglePartitioner.h
+++ b/cpp/core/shuffle/SinglePartitioner.h
@@ -29,5 +29,11 @@ class SinglePartitioner final : public Partitioner {
       const int64_t numRows,
       std::vector<uint32_t>& row2partition,
       std::vector<uint32_t>& partition2RowCount) override;
+
+  arrow::Status compute(
+      const int32_t* pidArr,
+      const int64_t numRows,
+      const int32_t vectorIndex,
+      std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) 
override;
 };
 } // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssClient.h b/cpp/core/shuffle/rss/RssClient.h
index 9209430b0..dddccfa1a 100644
--- a/cpp/core/shuffle/rss/RssClient.h
+++ b/cpp/core/shuffle/rss/RssClient.h
@@ -21,7 +21,7 @@ class RssClient {
  public:
   virtual ~RssClient() = default;
 
-  virtual int32_t pushPartitionData(int32_t partitionId, char* bytes, int64_t 
size) = 0;
+  virtual int32_t pushPartitionData(int32_t partitionId, const char* bytes, 
int64_t size) = 0;
 
   virtual void stop() = 0;
 };
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc 
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 15981bf8d..015129e26 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -73,4 +73,11 @@ arrow::Status RssPartitionWriter::evict(
       partitionId, 
reinterpret_cast<char*>(const_cast<uint8_t*>(buffer->data())), buffer->size());
   return arrow::Status::OK();
 }
+
+arrow::Status RssPartitionWriter::evict(uint32_t partitionId, int64_t rawSize, 
const char* data, int64_t length) {
+  rawPartitionLengths_[partitionId] += rawSize;
+  ScopedTimer timer(&spillTime_);
+  bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, 
data, length);
+  return arrow::Status::OK();
+}
 } // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h 
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index ef43017fc..b8cc1551c 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -44,6 +44,8 @@ class RssPartitionWriter final : public RemotePartitionWriter 
{
       bool reuseBuffers,
       bool hasComplexType) override;
 
+  arrow::Status evict(uint32_t partitionId, int64_t rawSize, const char* data, 
int64_t length) override;
+
   arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
 
   arrow::Status stop(ShuffleWriterMetrics* metrics) override;
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 9e5f08b1c..c058883b6 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -310,7 +310,8 @@ set(VELOX_SRCS
     operators/serializer/VeloxRowToColumnarConverter.cc
     operators/writer/VeloxParquetDatasource.cc
     shuffle/VeloxShuffleReader.cc
-    shuffle/VeloxShuffleWriter.cc
+    shuffle/VeloxHashBasedShuffleWriter.cc
+    shuffle/VeloxSortBasedShuffleWriter.cc
     substrait/SubstraitParser.cc
     substrait/SubstraitToVeloxExpr.cc
     substrait/SubstraitToVeloxPlan.cc
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 14593c8df..71d3d96b5 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -31,6 +31,7 @@
 #include "compute/VeloxRuntime.h"
 #include "config/GlutenConfig.h"
 #include "shuffle/LocalPartitionWriter.h"
+#include "shuffle/VeloxHashBasedShuffleWriter.h"
 #include "shuffle/VeloxShuffleWriter.h"
 #include "shuffle/rss/RssPartitionWriter.h"
 #include "utils/StringUtil.h"
@@ -111,7 +112,7 @@ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
   options.partitioning = gluten::toPartitioning(FLAGS_partitioning);
   GLUTEN_ASSIGN_OR_THROW(
       auto shuffleWriter,
-      VeloxShuffleWriter::create(
+      VeloxHashBasedShuffleWriter::create(
           FLAGS_shuffle_partitions,
           std::move(partitionWriter),
           std::move(options),
@@ -191,7 +192,7 @@ auto BM_Generic = [](::benchmark::State& state,
       GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, 
isFromEnv));
       const auto& shuffleWriter = createShuffleWriter(memoryManager.get(), 
dataFile, localDirs);
       while (resultIter->hasNext()) {
-        GLUTEN_THROW_NOT_OK(shuffleWriter->split(resultIter->next(), 
ShuffleWriter::kMinMemLimit));
+        GLUTEN_THROW_NOT_OK(shuffleWriter->write(resultIter->next(), 
ShuffleWriter::kMinMemLimit));
       }
       GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
       TIME_NANO_END(shuffleWriteTime);
diff --git a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc 
b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc
index 0109de603..4a4bb69b8 100644
--- a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc
+++ b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc
@@ -31,6 +31,7 @@
 #include "benchmarks/common/BenchmarkUtils.h"
 #include "memory/ColumnarBatch.h"
 #include "shuffle/LocalPartitionWriter.h"
+#include "shuffle/VeloxHashBasedShuffleWriter.h"
 #include "shuffle/VeloxShuffleWriter.h"
 #include "utils/TestUtils.h"
 #include "utils/VeloxArrowUtils.h"
@@ -259,7 +260,7 @@ class BenchmarkShuffleSplitCacheScanBenchmark : public 
BenchmarkShuffleSplit {
         numPartitions, PartitionWriterOptions{}, 
defaultArrowMemoryPool().get(), dataFile, localDirs);
     GLUTEN_ASSIGN_OR_THROW(
         shuffleWriter,
-        VeloxShuffleWriter::create(
+        VeloxHashBasedShuffleWriter::create(
             numPartitions,
             std::move(partitionWriter),
             std::move(options),
@@ -294,7 +295,7 @@ class BenchmarkShuffleSplitCacheScanBenchmark : public 
BenchmarkShuffleSplit {
           [&shuffleWriter, &splitTime](const 
std::shared_ptr<arrow::RecordBatch>& recordBatch) {
             std::shared_ptr<ColumnarBatch> cb;
             ARROW_ASSIGN_OR_THROW(cb, 
recordBatch2VeloxColumnarBatch(*recordBatch));
-            TIME_NANO_OR_THROW(splitTime, shuffleWriter->split(cb, 
ShuffleWriter::kMinMemLimit));
+            TIME_NANO_OR_THROW(splitTime, shuffleWriter->write(cb, 
ShuffleWriter::kMinMemLimit));
           });
       // LOG(INFO) << " split done memory allocated = " <<
       // options.memoryPool->bytes_allocated();
@@ -327,7 +328,7 @@ class BenchmarkShuffleSplitIterateScanBenchmark : public 
BenchmarkShuffleSplit {
         numPartitions, PartitionWriterOptions{}, 
defaultArrowMemoryPool().get(), dataFile, localDirs);
     GLUTEN_ASSIGN_OR_THROW(
         shuffleWriter,
-        VeloxShuffleWriter::create(
+        VeloxHashBasedShuffleWriter::create(
             numPartitions,
             std::move(partitionWriter),
             std::move(options),
@@ -350,7 +351,7 @@ class BenchmarkShuffleSplitIterateScanBenchmark : public 
BenchmarkShuffleSplit {
         numRows += recordBatch->num_rows();
         std::shared_ptr<ColumnarBatch> cb;
         ARROW_ASSIGN_OR_THROW(cb, 
recordBatch2VeloxColumnarBatch(*recordBatch));
-        TIME_NANO_OR_THROW(splitTime, shuffleWriter->split(cb, 
ShuffleWriter::kMinMemLimit));
+        TIME_NANO_OR_THROW(splitTime, shuffleWriter->write(cb, 
ShuffleWriter::kMinMemLimit));
         TIME_NANO_OR_THROW(elapseRead, 
recordBatchReader->ReadNext(&recordBatch));
       }
     }
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index a3e8c159c..15c84b41c 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -28,8 +28,9 @@
 #include "compute/VeloxPlanConverter.h"
 #include "config/VeloxConfig.h"
 #include "operators/serializer/VeloxRowToColumnarConverter.h"
+#include "shuffle/VeloxHashBasedShuffleWriter.h"
 #include "shuffle/VeloxShuffleReader.h"
-#include "shuffle/VeloxShuffleWriter.h"
+#include "shuffle/VeloxSortBasedShuffleWriter.h"
 #include "utils/ConfigExtractor.h"
 #include "utils/VeloxArrowUtils.h"
 
@@ -187,10 +188,19 @@ std::shared_ptr<ShuffleWriter> 
VeloxRuntime::createShuffleWriter(
     MemoryManager* memoryManager) {
   auto ctxPool = getLeafVeloxPool(memoryManager);
   auto arrowPool = memoryManager->getArrowMemoryPool();
-  GLUTEN_ASSIGN_OR_THROW(
-      auto shuffle_writer,
-      VeloxShuffleWriter::create(numPartitions, std::move(partitionWriter), 
std::move(options), ctxPool, arrowPool));
-  return shuffle_writer;
+  std::shared_ptr<ShuffleWriter> shuffleWriter;
+  if (options.shuffleWriterType == kHashShuffle) {
+    GLUTEN_ASSIGN_OR_THROW(
+        shuffleWriter,
+        VeloxHashBasedShuffleWriter::create(
+            numPartitions, std::move(partitionWriter), std::move(options), 
ctxPool, arrowPool));
+  } else if (options.shuffleWriterType == kSortShuffle) {
+    GLUTEN_ASSIGN_OR_THROW(
+        shuffleWriter,
+        VeloxSortBasedShuffleWriter::create(
+            numPartitions, std::move(partitionWriter), std::move(options), 
ctxPool, arrowPool));
+  }
+  return shuffleWriter;
 }
 
 std::shared_ptr<Datasource> VeloxRuntime::createDatasource(
@@ -242,9 +252,18 @@ std::shared_ptr<ShuffleReader> 
VeloxRuntime::createShuffleReader(
   auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema));
   auto codec = gluten::createArrowIpcCodec(options.compressionType, 
options.codecBackend);
   auto ctxVeloxPool = getLeafVeloxPool(memoryManager);
+  auto veloxCompressionType = 
facebook::velox::common::stringToCompressionKind(options.compressionTypeStr);
   auto deserializerFactory = 
std::make_unique<gluten::VeloxColumnarBatchDeserializerFactory>(
-      schema, std::move(codec), rowType, options.batchSize, pool, 
ctxVeloxPool);
-  return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
+      schema,
+      std::move(codec),
+      veloxCompressionType,
+      rowType,
+      options.batchSize,
+      pool,
+      ctxVeloxPool,
+      options.shuffleWriterType);
+  auto reader = 
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
+  return reader;
 }
 
 std::unique_ptr<ColumnarBatchSerializer> 
VeloxRuntime::createColumnarBatchSerializer(
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
similarity index 90%
rename from cpp/velox/shuffle/VeloxShuffleWriter.cc
rename to cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
index b304565e5..daff13703 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-#include "VeloxShuffleWriter.h"
+#include "VeloxHashBasedShuffleWriter.h"
 #include "memory/ArrowMemory.h"
 #include "memory/VeloxColumnarBatch.h"
 #include "memory/VeloxMemoryManager.h"
@@ -70,58 +70,6 @@ bool vectorHasNull(const facebook::velox::VectorPtr& vp) {
   return vp->countNulls(vp->nulls(), vp->size()) != 0;
 }
 
-facebook::velox::RowVectorPtr getStrippedRowVector(const 
facebook::velox::RowVector& rv) {
-  // get new row type
-  auto rowType = rv.type()->asRow();
-  auto typeChildren = rowType.children();
-  typeChildren.erase(typeChildren.begin());
-  auto newRowType = facebook::velox::ROW(std::move(typeChildren));
-
-  // get length
-  auto length = rv.size();
-
-  // get children
-  auto children = rv.children();
-  children.erase(children.begin());
-
-  return std::make_shared<facebook::velox::RowVector>(
-      rv.pool(), newRowType, facebook::velox::BufferPtr(nullptr), length, 
std::move(children));
-}
-
-const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) {
-  VELOX_CHECK(rv.childrenSize() > 0, "RowVector missing partition id column.");
-
-  auto& firstChild = rv.childAt(0);
-  VELOX_CHECK(firstChild->isFlatEncoding(), "Partition id (field 0) is not 
flat encoding.");
-  VELOX_CHECK(
-      firstChild->type()->isInteger(),
-      "Partition id (field 0) should be integer, but got {}",
-      firstChild->type()->toString());
-
-  // first column is partition key hash value or pid
-  return firstChild->asFlatVector<int32_t>()->rawValues();
-}
-
-class EvictGuard {
- public:
-  explicit EvictGuard(EvictState& evictState) : evictState_(evictState) {
-    evictState_ = EvictState::kUnevictable;
-  }
-
-  ~EvictGuard() {
-    evictState_ = EvictState::kEvictable;
-  }
-
-  // For safety and clarity.
-  EvictGuard(const EvictGuard&) = delete;
-  EvictGuard& operator=(const EvictGuard&) = delete;
-  EvictGuard(EvictGuard&&) = delete;
-  EvictGuard& operator=(EvictGuard&&) = delete;
-
- private:
-  EvictState& evictState_;
-};
-
 class BinaryArrayResizeGuard {
  public:
   explicit BinaryArrayResizeGuard(BinaryArrayResizeState& state) : 
state_(state) {
@@ -199,19 +147,19 @@ arrow::Status 
collectFlatVectorBuffer<facebook::velox::TypeKind::VARBINARY>(
 
 } // namespace
 
-arrow::Result<std::shared_ptr<VeloxShuffleWriter>> VeloxShuffleWriter::create(
+arrow::Result<std::shared_ptr<VeloxShuffleWriter>> 
VeloxHashBasedShuffleWriter::create(
     uint32_t numPartitions,
     std::unique_ptr<PartitionWriter> partitionWriter,
     ShuffleWriterOptions options,
     std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
     arrow::MemoryPool* arrowPool) {
-  std::shared_ptr<VeloxShuffleWriter> res(
-      new VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), veloxPool, arrowPool));
+  std::shared_ptr<VeloxHashBasedShuffleWriter> res(new 
VeloxHashBasedShuffleWriter(
+      numPartitions, std::move(partitionWriter), std::move(options), 
veloxPool, arrowPool));
   RETURN_NOT_OK(res->init());
   return res;
 } // namespace gluten
 
-arrow::Status VeloxShuffleWriter::init() {
+arrow::Status VeloxHashBasedShuffleWriter::init() {
 #if defined(__x86_64__)
   supportAvx512_ = __builtin_cpu_supports("avx512bw");
 #else
@@ -235,7 +183,7 @@ arrow::Status VeloxShuffleWriter::init() {
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::initPartitions() {
+arrow::Status VeloxHashBasedShuffleWriter::initPartitions() {
   auto simpleColumnCount = simpleColumnIndices_.size();
 
   partitionValidityAddrs_.resize(simpleColumnCount);
@@ -260,15 +208,11 @@ arrow::Status VeloxShuffleWriter::initPartitions() {
   return arrow::Status::OK();
 }
 
-int64_t VeloxShuffleWriter::rawPartitionBytes() const {
-  return std::accumulate(metrics_.rawPartitionLengths.begin(), 
metrics_.rawPartitionLengths.end(), 0LL);
-}
-
-void VeloxShuffleWriter::setPartitionBufferSize(uint32_t newSize) {
+void VeloxHashBasedShuffleWriter::setPartitionBufferSize(uint32_t newSize) {
   options_.bufferSize = newSize;
 }
 
-arrow::Result<std::shared_ptr<arrow::Buffer>> 
VeloxShuffleWriter::generateComplexTypeBuffers(
+arrow::Result<std::shared_ptr<arrow::Buffer>> 
VeloxHashBasedShuffleWriter::generateComplexTypeBuffers(
     facebook::velox::RowVectorPtr vector) {
   auto arena = 
std::make_unique<facebook::velox::StreamArena>(veloxPool_.get());
   auto serializer =
@@ -291,7 +235,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> 
VeloxShuffleWriter::generateComple
   return valueBuffer;
 }
 
-arrow::Status VeloxShuffleWriter::split(std::shared_ptr<ColumnarBatch> cb, 
int64_t memLimit) {
+arrow::Status 
VeloxHashBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t 
memLimit) {
   if (options_.partitioning == Partitioning::kSingle) {
     auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
     VELOX_CHECK_NOT_NULL(veloxColumnBatch);
@@ -357,7 +301,7 @@ arrow::Status 
VeloxShuffleWriter::split(std::shared_ptr<ColumnarBatch> cb, int64
   return arrow::Status::OK();
 }
 
-arrow::Status 
VeloxShuffleWriter::partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, 
int64_t memLimit) {
+arrow::Status 
VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velox::RowVectorPtr
 rv, int64_t memLimit) {
   if (partitioner_->hasPid()) {
     auto pidArr = getFirstColumn(*rv);
     START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
@@ -376,7 +320,7 @@ arrow::Status 
VeloxShuffleWriter::partitioningAndDoSplit(facebook::velox::RowVec
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::stop() {
+arrow::Status VeloxHashBasedShuffleWriter::stop() {
   if (options_.partitioning != Partitioning::kSingle) {
     for (auto pid = 0; pid < numPartitions_; ++pid) {
       RETURN_NOT_OK(evictPartitionBuffers(pid, false));
@@ -394,7 +338,7 @@ arrow::Status VeloxShuffleWriter::stop() {
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::buildPartition2Row(uint32_t rowNum) {
+arrow::Status VeloxHashBasedShuffleWriter::buildPartition2Row(uint32_t rowNum) 
{
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingBuildPartition]);
 
   // calc partition2RowOffsetBase_
@@ -427,7 +371,7 @@ arrow::Status 
VeloxShuffleWriter::buildPartition2Row(uint32_t rowNum) {
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::updateInputHasNull(const 
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::updateInputHasNull(const 
facebook::velox::RowVector& rv) {
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingHasNull]);
 
   for (size_t col = 0; col < simpleColumnIndices_.size(); ++col) {
@@ -444,11 +388,11 @@ arrow::Status 
VeloxShuffleWriter::updateInputHasNull(const facebook::velox::RowV
   return arrow::Status::OK();
 }
 
-void VeloxShuffleWriter::setSplitState(SplitState state) {
+void VeloxHashBasedShuffleWriter::setSplitState(SplitState state) {
   splitState_ = state;
 }
 
-arrow::Status VeloxShuffleWriter::doSplit(const facebook::velox::RowVector& 
rv, int64_t memLimit) {
+arrow::Status VeloxHashBasedShuffleWriter::doSplit(const 
facebook::velox::RowVector& rv, int64_t memLimit) {
   auto rowNum = rv.size();
   RETURN_NOT_OK(buildPartition2Row(rowNum));
   RETURN_NOT_OK(updateInputHasNull(rv));
@@ -472,7 +416,7 @@ arrow::Status VeloxShuffleWriter::doSplit(const 
facebook::velox::RowVector& rv,
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::splitRowVector(const 
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::splitRowVector(const 
facebook::velox::RowVector& rv) {
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitRV]);
 
   // now start to split the RowVector
@@ -489,7 +433,7 @@ arrow::Status VeloxShuffleWriter::splitRowVector(const 
facebook::velox::RowVecto
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const 
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::splitFixedWidthValueBuffer(const 
facebook::velox::RowVector& rv) {
   for (auto col = 0; col < fixedWidthColumnCount_; ++col) {
     auto colIdx = simpleColumnIndices_[col];
     auto& column = rv.childAt(colIdx);
@@ -543,7 +487,9 @@ arrow::Status 
VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::splitBoolType(const uint8_t* srcAddr, const 
std::vector<uint8_t*>& dstAddrs) {
+arrow::Status VeloxHashBasedShuffleWriter::splitBoolType(
+    const uint8_t* srcAddr,
+    const std::vector<uint8_t*>& dstAddrs) {
   // assume batch size = 32k; reducer# = 4K; row/reducer = 8
   for (auto& pid : partitionUsed_) {
     // set the last byte
@@ -632,7 +578,7 @@ arrow::Status VeloxShuffleWriter::splitBoolType(const 
uint8_t* srcAddr, const st
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::splitValidityBuffer(const 
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::splitValidityBuffer(const 
facebook::velox::RowVector& rv) {
   for (size_t col = 0; col < simpleColumnIndices_.size(); ++col) {
     auto colIdx = simpleColumnIndices_[col];
     auto& column = rv.childAt(colIdx);
@@ -660,7 +606,7 @@ arrow::Status VeloxShuffleWriter::splitValidityBuffer(const 
facebook::velox::Row
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::splitBinaryType(
+arrow::Status VeloxHashBasedShuffleWriter::splitBinaryType(
     uint32_t binaryIdx,
     const facebook::velox::FlatVector<facebook::velox::StringView>& src,
     std::vector<BinaryBuf>& dst) {
@@ -723,7 +669,7 @@ arrow::Status VeloxShuffleWriter::splitBinaryType(
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::splitBinaryArray(const 
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::splitBinaryArray(const 
facebook::velox::RowVector& rv) {
   for (auto col = fixedWidthColumnCount_; col < simpleColumnIndices_.size(); 
++col) {
     auto binaryIdx = col - fixedWidthColumnCount_;
     auto& dstAddrs = partitionBinaryAddrs_[binaryIdx];
@@ -734,7 +680,7 @@ arrow::Status VeloxShuffleWriter::splitBinaryArray(const 
facebook::velox::RowVec
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::splitComplexType(const 
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::splitComplexType(const 
facebook::velox::RowVector& rv) {
   if (complexColumnIndices_.size() == 0) {
     return arrow::Status::OK();
   }
@@ -773,7 +719,7 @@ arrow::Status VeloxShuffleWriter::splitComplexType(const 
facebook::velox::RowVec
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::initColumnTypes(const 
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::initColumnTypes(const 
facebook::velox::RowVector& rv) {
   schema_ = toArrowSchema(rv.type(), veloxPool_.get());
   for (size_t i = 0; i < rv.childrenSize(); ++i) {
     veloxColumnTypes_.push_back(rv.childAt(i)->type());
@@ -837,7 +783,7 @@ arrow::Status VeloxShuffleWriter::initColumnTypes(const 
facebook::velox::RowVect
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::initFromRowVector(const 
facebook::velox::RowVector& rv) {
+arrow::Status VeloxHashBasedShuffleWriter::initFromRowVector(const 
facebook::velox::RowVector& rv) {
   if (veloxColumnTypes_.empty()) {
     RETURN_NOT_OK(initColumnTypes(rv));
     RETURN_NOT_OK(initPartitions());
@@ -846,13 +792,13 @@ arrow::Status VeloxShuffleWriter::initFromRowVector(const 
facebook::velox::RowVe
   return arrow::Status::OK();
 }
 
-inline bool VeloxShuffleWriter::beyondThreshold(uint32_t partitionId, uint32_t 
newSize) {
+inline bool VeloxHashBasedShuffleWriter::beyondThreshold(uint32_t partitionId, 
uint32_t newSize) {
   auto currentBufferSize = partitionBufferSize_[partitionId];
   return newSize > (1 + options_.bufferReallocThreshold) * currentBufferSize ||
       newSize < (1 - options_.bufferReallocThreshold) * currentBufferSize;
 }
 
-void VeloxShuffleWriter::calculateSimpleColumnBytes() {
+void VeloxHashBasedShuffleWriter::calculateSimpleColumnBytes() {
   fixedWidthBufferBytes_ = 0;
   for (size_t col = 0; col < fixedWidthColumnCount_; ++col) {
     auto colIdx = simpleColumnIndices_[col];
@@ -862,7 +808,9 @@ void VeloxShuffleWriter::calculateSimpleColumnBytes() {
   fixedWidthBufferBytes_ += kSizeOfBinaryArrayLengthBuffer * 
binaryColumnIndices_.size();
 }
 
-uint32_t VeloxShuffleWriter::calculatePartitionBufferSize(const 
facebook::velox::RowVector& rv, int64_t memLimit) {
+uint32_t VeloxHashBasedShuffleWriter::calculatePartitionBufferSize(
+    const facebook::velox::RowVector& rv,
+    int64_t memLimit) {
   auto bytesPerRow = fixedWidthBufferBytes_;
 
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCalculateBufferSize]);
@@ -915,7 +863,7 @@ uint32_t 
VeloxShuffleWriter::calculatePartitionBufferSize(const facebook::velox:
 }
 
 arrow::Result<std::shared_ptr<arrow::ResizableBuffer>>
-VeloxShuffleWriter::allocateValidityBuffer(uint32_t col, uint32_t partitionId, 
uint32_t newSize) {
+VeloxHashBasedShuffleWriter::allocateValidityBuffer(uint32_t col, uint32_t 
partitionId, uint32_t newSize) {
   if (inputHasNull_[col]) {
     ARROW_ASSIGN_OR_RAISE(
         auto validityBuffer,
@@ -929,7 +877,7 @@ VeloxShuffleWriter::allocateValidityBuffer(uint32_t col, 
uint32_t partitionId, u
   return nullptr;
 }
 
-arrow::Status VeloxShuffleWriter::updateValidityBuffers(uint32_t partitionId, 
uint32_t newSize) {
+arrow::Status VeloxHashBasedShuffleWriter::updateValidityBuffers(uint32_t 
partitionId, uint32_t newSize) {
   for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
     // If the validity buffer is not yet allocated, allocate and fill 0xff 
based on inputHasNull_.
     if (partitionValidityAddrs_[i][partitionId] == nullptr) {
@@ -940,7 +888,7 @@ arrow::Status 
VeloxShuffleWriter::updateValidityBuffers(uint32_t partitionId, ui
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::allocatePartitionBuffer(uint32_t 
partitionId, uint32_t newSize) {
+arrow::Status VeloxHashBasedShuffleWriter::allocatePartitionBuffer(uint32_t 
partitionId, uint32_t newSize) {
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingAllocateBuffer]);
 
   for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
@@ -987,7 +935,7 @@ arrow::Status 
VeloxShuffleWriter::allocatePartitionBuffer(uint32_t partitionId,
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::evictBuffers(
+arrow::Status VeloxHashBasedShuffleWriter::evictBuffers(
     uint32_t partitionId,
     uint32_t numRows,
     std::vector<std::shared_ptr<arrow::Buffer>> buffers,
@@ -1000,7 +948,7 @@ arrow::Status VeloxShuffleWriter::evictBuffers(
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::evictPartitionBuffers(uint32_t partitionId, 
bool reuseBuffers) {
+arrow::Status VeloxHashBasedShuffleWriter::evictPartitionBuffers(uint32_t 
partitionId, bool reuseBuffers) {
   auto numRows = partitionBufferBase_[partitionId];
   if (numRows > 0) {
     ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(partitionId, 
reuseBuffers));
@@ -1009,7 +957,7 @@ arrow::Status 
VeloxShuffleWriter::evictPartitionBuffers(uint32_t partitionId, bo
   return arrow::Status::OK();
 }
 
-arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> 
VeloxShuffleWriter::assembleBuffers(
+arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> 
VeloxHashBasedShuffleWriter::assembleBuffers(
     uint32_t partitionId,
     bool reuseBuffers) {
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCreateRbFromBuffer]);
@@ -1150,7 +1098,7 @@ 
arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> VeloxShuffleWriter::a
   return allBuffers;
 }
 
-arrow::Status VeloxShuffleWriter::reclaimFixedSize(int64_t size, int64_t* 
actual) {
+arrow::Status VeloxHashBasedShuffleWriter::reclaimFixedSize(int64_t size, 
int64_t* actual) {
   if (evictState_ == EvictState::kUnevictable) {
     *actual = 0;
     return arrow::Status::OK();
@@ -1174,7 +1122,7 @@ arrow::Status 
VeloxShuffleWriter::reclaimFixedSize(int64_t size, int64_t* actual
   return arrow::Status::OK();
 }
 
-arrow::Result<int64_t> VeloxShuffleWriter::evictCachedPayload(int64_t size) {
+arrow::Result<int64_t> VeloxHashBasedShuffleWriter::evictCachedPayload(int64_t 
size) {
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingEvictPartition]);
   int64_t actual;
   auto before = partitionBufferPool_->bytes_allocated();
@@ -1188,7 +1136,7 @@ arrow::Result<int64_t> 
VeloxShuffleWriter::evictCachedPayload(int64_t size) {
   return actual;
 }
 
-arrow::Status VeloxShuffleWriter::resetValidityBuffer(uint32_t partitionId) {
+arrow::Status VeloxHashBasedShuffleWriter::resetValidityBuffer(uint32_t 
partitionId) {
   std::for_each(partitionBuffers_.begin(), partitionBuffers_.end(), 
[partitionId](auto& bufs) {
     if (bufs[partitionId].size() != 0 && 
bufs[partitionId][kValidityBufferIndex] != nullptr) {
       // initialize all true once allocated
@@ -1199,7 +1147,8 @@ arrow::Status 
VeloxShuffleWriter::resetValidityBuffer(uint32_t partitionId) {
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, 
uint32_t newSize, bool preserveData) {
+arrow::Status
+VeloxHashBasedShuffleWriter::resizePartitionBuffer(uint32_t partitionId, 
uint32_t newSize, bool preserveData) {
   for (auto i = 0; i < simpleColumnIndices_.size(); ++i) {
     auto columnType = schema_->field(simpleColumnIndices_[i])->type()->id();
     auto& buffers = partitionBuffers_[i][partitionId];
@@ -1278,7 +1227,7 @@ arrow::Status 
VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, ui
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxShuffleWriter::shrinkPartitionBuffer(uint32_t partitionId) {
+arrow::Status VeloxHashBasedShuffleWriter::shrinkPartitionBuffer(uint32_t 
partitionId) {
   auto bufferSize = partitionBufferSize_[partitionId];
   if (bufferSize == 0) {
     return arrow::Status::OK();
@@ -1301,11 +1250,11 @@ arrow::Status 
VeloxShuffleWriter::shrinkPartitionBuffer(uint32_t partitionId) {
   return resizePartitionBuffer(partitionId, newSize, /*preserveData=*/true);
 }
 
-uint64_t VeloxShuffleWriter::valueBufferSizeForBinaryArray(uint32_t binaryIdx, 
uint32_t newSize) {
+uint64_t VeloxHashBasedShuffleWriter::valueBufferSizeForBinaryArray(uint32_t 
binaryIdx, uint32_t newSize) {
   return (binaryArrayTotalSizeBytes_[binaryIdx] + totalInputNumRows_ - 1) / 
totalInputNumRows_ * newSize + 1024;
 }
 
-uint64_t VeloxShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t 
fixedWidthIndex, uint32_t newSize) {
+uint64_t 
VeloxHashBasedShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t 
fixedWidthIndex, uint32_t newSize) {
   uint64_t valueBufferSize = 0;
   auto columnIdx = simpleColumnIndices_[fixedWidthIndex];
   if (arrowColumnTypes_[columnIdx]->id() == arrow::BooleanType::type_id) {
@@ -1320,7 +1269,7 @@ uint64_t 
VeloxShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t fixedWid
   return valueBufferSize;
 }
 
-void VeloxShuffleWriter::stat() const {
+void VeloxHashBasedShuffleWriter::stat() const {
 #if VELOX_SHUFFLE_WRITER_LOG_FLAG
   for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) {
     std::ostringstream oss;
@@ -1336,7 +1285,7 @@ void VeloxShuffleWriter::stat() const {
 #endif
 }
 
-arrow::Status VeloxShuffleWriter::resetPartitionBuffer(uint32_t partitionId) {
+arrow::Status VeloxHashBasedShuffleWriter::resetPartitionBuffer(uint32_t 
partitionId) {
   // Reset fixed-width partition buffers
   for (auto i = 0; i < fixedWidthColumnCount_; ++i) {
     partitionValidityAddrs_[i][partitionId] = nullptr;
@@ -1356,11 +1305,11 @@ arrow::Status 
VeloxShuffleWriter::resetPartitionBuffer(uint32_t partitionId) {
   return arrow::Status::OK();
 }
 
-const uint64_t VeloxShuffleWriter::cachedPayloadSize() const {
+const uint64_t VeloxHashBasedShuffleWriter::cachedPayloadSize() const {
   return partitionWriter_->cachedPayloadSize();
 }
 
-arrow::Result<int64_t> 
VeloxShuffleWriter::shrinkPartitionBuffersMinSize(int64_t size) {
+arrow::Result<int64_t> 
VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSize(int64_t size) {
   // Sort partition buffers by (partitionBufferSize_ - partitionBufferBase_)
   std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
   for (auto pid = 0; pid < numPartitions_; ++pid) {
@@ -1388,7 +1337,7 @@ arrow::Result<int64_t> 
VeloxShuffleWriter::shrinkPartitionBuffersMinSize(int64_t
   return shrunken;
 }
 
-arrow::Result<int64_t> 
VeloxShuffleWriter::evictPartitionBuffersMinSize(int64_t size) {
+arrow::Result<int64_t> 
VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize(int64_t size) {
   // Evict partition buffers, only when splitState_ == SplitState::kInit, and 
space freed from
   // shrinking is not enough. In this case partitionBufferSize_ == 
partitionBufferBase_
   int64_t beforeEvict = partitionBufferPool_->bytes_allocated();
@@ -1415,7 +1364,7 @@ arrow::Result<int64_t> 
VeloxShuffleWriter::evictPartitionBuffersMinSize(int64_t
   return evicted;
 }
 
-bool VeloxShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
+bool VeloxHashBasedShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
   // If OOM happens during SplitState::kSplit, it is triggered by binary 
buffers resize.
   // Or during SplitState::kInit, it is triggered by other operators.
   // The reclaim order is spill->shrink, because the partition buffers can be 
reused.
@@ -1424,13 +1373,13 @@ bool 
VeloxShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
       (splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit);
 }
 
-bool VeloxShuffleWriter::evictPartitionBuffersAfterSpill() const {
+bool VeloxHashBasedShuffleWriter::evictPartitionBuffersAfterSpill() const {
   // If OOM triggered by other operators, the splitState_ is SplitState::kInit.
   // The last resort is to evict the partition buffers to reclaim more space.
   return options_.partitioning != Partitioning::kSingle && splitState_ == 
SplitState::kInit;
 }
 
-arrow::Result<uint32_t> 
VeloxShuffleWriter::partitionBufferSizeAfterShrink(uint32_t partitionId) const {
+arrow::Result<uint32_t> 
VeloxHashBasedShuffleWriter::partitionBufferSizeAfterShrink(uint32_t 
partitionId) const {
   if (splitState_ == SplitState::kSplit) {
     return partitionBufferBase_[partitionId] + 
partition2RowCount_[partitionId];
   }
@@ -1440,7 +1389,7 @@ arrow::Result<uint32_t> 
VeloxShuffleWriter::partitionBufferSizeAfterShrink(uint3
   return arrow::Status::Invalid("Cannot shrink partition buffers in 
SplitState: " + std::to_string(splitState_));
 }
 
-arrow::Status VeloxShuffleWriter::preAllocPartitionBuffers(uint32_t 
preAllocBufferSize) {
+arrow::Status VeloxHashBasedShuffleWriter::preAllocPartitionBuffers(uint32_t 
preAllocBufferSize) {
   for (auto& pid : partitionUsed_) {
     auto newSize = std::max(preAllocBufferSize, partition2RowCount_[pid]);
     VLOG_IF(9, partitionBufferSize_[pid] != newSize)
@@ -1494,7 +1443,7 @@ arrow::Status 
VeloxShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBuff
   return arrow::Status::OK();
 }
 
-bool VeloxShuffleWriter::isExtremelyLargeBatch(facebook::velox::RowVectorPtr& 
rv) const {
+bool 
VeloxHashBasedShuffleWriter::isExtremelyLargeBatch(facebook::velox::RowVectorPtr&
 rv) const {
   return (rv->size() > maxBatchSize_ && maxBatchSize_ > 0);
 }
 
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h 
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
similarity index 83%
copy from cpp/velox/shuffle/VeloxShuffleWriter.h
copy to cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
index e699a323b..a11f84e95 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
@@ -36,10 +36,10 @@
 #include <arrow/result.h>
 #include <arrow/type.h>
 
+#include "VeloxShuffleWriter.h"
 #include "memory/VeloxMemoryManager.h"
 #include "shuffle/PartitionWriter.h"
 #include "shuffle/Partitioner.h"
-#include "shuffle/ShuffleWriter.h"
 #include "shuffle/Utils.h"
 
 #include "utils/Print.h"
@@ -88,7 +88,6 @@ namespace gluten {
 #endif // end of VELOX_SHUFFLE_WRITER_PRINT
 
 enum SplitState { kInit, kPreAlloc, kSplit, kStop };
-enum EvictState { kEvictable, kUnevictable };
 
 struct BinaryArrayResizeState {
   bool inResize;
@@ -100,7 +99,7 @@ struct BinaryArrayResizeState {
       : inResize(false), partitionId(partitionId), binaryIdx(binaryIdx) {}
 };
 
-class VeloxShuffleWriter final : public ShuffleWriter {
+class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter {
   enum {
     kValidityBufferIndex = 0,
     kFixedWidthValueBufferIndex = 1,
@@ -130,7 +129,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {
       std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
       arrow::MemoryPool* arrowPool);
 
-  arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) 
override;
+  arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) 
override;
 
   arrow::Status stop() override;
 
@@ -138,12 +137,10 @@ class VeloxShuffleWriter final : public ShuffleWriter {
 
   const uint64_t cachedPayloadSize() const override;
 
-  arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers);
-
-  int64_t rawPartitionBytes() const;
+  arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) 
override;
 
   // For test only.
-  void setPartitionBufferSize(uint32_t newSize);
+  void setPartitionBufferSize(uint32_t newSize) override;
 
   // for debugging
   void printColumnsInfo() const {
@@ -192,22 +189,14 @@ class VeloxShuffleWriter final : public ShuffleWriter {
     VS_PRINT_CONTAINER(input_has_null_);
   }
 
-  int32_t maxBatchSize() const {
-    return maxBatchSize_;
-  }
-
  private:
-  VeloxShuffleWriter(
+  VeloxHashBasedShuffleWriter(
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
       ShuffleWriterOptions options,
       std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
       arrow::MemoryPool* pool)
-      : ShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), pool),
-        veloxPool_(std::move(veloxPool)) {
-    arenas_.resize(numPartitions);
-    serdeOptions_.useLosslessTimestamp = true;
-  }
+      : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), std::move(veloxPool), pool) {}
 
   arrow::Status init();
 
@@ -314,14 +303,8 @@ class VeloxShuffleWriter final : public ShuffleWriter {
 
   arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, 
int64_t memLimit);
 
-  SplitState splitState_{kInit};
-
-  EvictState evictState_{kEvictable};
-
   BinaryArrayResizeState binaryArrayResizeState_{};
 
-  bool supportAvx512_ = false;
-
   bool hasComplexType_ = false;
   std::vector<bool> isValidityBuffer_;
 
@@ -415,66 +398,9 @@ class VeloxShuffleWriter final : public ShuffleWriter {
   std::vector<std::shared_ptr<arrow::ResizableBuffer>> complexTypeFlushBuffer_;
   std::shared_ptr<const facebook::velox::RowType> complexWriteType_;
 
-  std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
-  std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;
   facebook::velox::serializer::presto::PrestoVectorSerde serde_;
-  facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions 
serdeOptions_;
-
-  // stat
-  enum CpuWallTimingType {
-    CpuWallTimingBegin = 0,
-    CpuWallTimingCompute = CpuWallTimingBegin,
-    CpuWallTimingBuildPartition,
-    CpuWallTimingEvictPartition,
-    CpuWallTimingHasNull,
-    CpuWallTimingCalculateBufferSize,
-    CpuWallTimingAllocateBuffer,
-    CpuWallTimingCreateRbFromBuffer,
-    CpuWallTimingMakeRB,
-    CpuWallTimingCacheRB,
-    CpuWallTimingFlattenRV,
-    CpuWallTimingSplitRV,
-    CpuWallTimingIteratePartitions,
-    CpuWallTimingStop,
-    CpuWallTimingEnd,
-    CpuWallTimingNum = CpuWallTimingEnd - CpuWallTimingBegin
-  };
-
-  static std::string CpuWallTimingName(CpuWallTimingType type) {
-    switch (type) {
-      case CpuWallTimingCompute:
-        return "CpuWallTimingCompute";
-      case CpuWallTimingBuildPartition:
-        return "CpuWallTimingBuildPartition";
-      case CpuWallTimingEvictPartition:
-        return "CpuWallTimingEvictPartition";
-      case CpuWallTimingHasNull:
-        return "CpuWallTimingHasNull";
-      case CpuWallTimingCalculateBufferSize:
-        return "CpuWallTimingCalculateBufferSize";
-      case CpuWallTimingAllocateBuffer:
-        return "CpuWallTimingAllocateBuffer";
-      case CpuWallTimingCreateRbFromBuffer:
-        return "CpuWallTimingCreateRbFromBuffer";
-      case CpuWallTimingMakeRB:
-        return "CpuWallTimingMakeRB";
-      case CpuWallTimingCacheRB:
-        return "CpuWallTimingCacheRB";
-      case CpuWallTimingFlattenRV:
-        return "CpuWallTimingFlattenRV";
-      case CpuWallTimingSplitRV:
-        return "CpuWallTimingSplitRV";
-      case CpuWallTimingIteratePartitions:
-        return "CpuWallTimingIteratePartitions";
-      case CpuWallTimingStop:
-        return "CpuWallTimingStop";
-      default:
-        return "CpuWallTimingUnknown";
-    }
-  }
 
-  facebook::velox::CpuWallTiming cpuWallTimingList_[CpuWallTimingNum];
-  int32_t maxBatchSize_{0};
-}; // class VeloxShuffleWriter
+  SplitState splitState_{kInit};
+}; // class VeloxHashBasedShuffleWriter
 
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 5c4e01936..22298ef91 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -370,39 +370,104 @@ std::shared_ptr<ColumnarBatch> 
VeloxColumnarBatchDeserializer::next() {
 VeloxColumnarBatchDeserializerFactory::VeloxColumnarBatchDeserializerFactory(
     const std::shared_ptr<arrow::Schema>& schema,
     const std::shared_ptr<arrow::util::Codec>& codec,
+    const facebook::velox::common::CompressionKind veloxCompressionType,
     const RowTypePtr& rowType,
     int32_t batchSize,
     arrow::MemoryPool* memoryPool,
-    std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool)
+    std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+    ShuffleWriterType shuffleWriterType)
     : schema_(schema),
       codec_(codec),
+      veloxCompressionType_(veloxCompressionType),
       rowType_(rowType),
       batchSize_(batchSize),
       memoryPool_(memoryPool),
-      veloxPool_(veloxPool) {
+      veloxPool_(veloxPool),
+      shuffleWriterType_(shuffleWriterType) {
   initFromSchema();
 }
 
 std::unique_ptr<ColumnarBatchIterator> 
VeloxColumnarBatchDeserializerFactory::createDeserializer(
     std::shared_ptr<arrow::io::InputStream> in) {
-  return std::make_unique<VeloxColumnarBatchDeserializer>(
-      std::move(in),
-      schema_,
-      codec_,
+  if (shuffleWriterType_ == kHashShuffle) {
+    return std::make_unique<VeloxColumnarBatchDeserializer>(
+        std::move(in),
+        schema_,
+        codec_,
+        rowType_,
+        batchSize_,
+        memoryPool_,
+        veloxPool_.get(),
+        &isValidityBuffer_,
+        hasComplexType_,
+        deserializeTime_,
+        decompressTime_);
+  }
+  return std::make_unique<VeloxShuffleReaderOutStreamWrapper>(
+      veloxPool_,
       rowType_,
       batchSize_,
-      memoryPool_,
-      veloxPool_.get(),
-      &isValidityBuffer_,
-      hasComplexType_,
-      deserializeTime_,
-      decompressTime_);
+      veloxCompressionType_,
+      [this](int64_t decompressionTime) { this->decompressTime_ += 
decompressionTime; },
+      [this](int64_t deserializeTime) { this->deserializeTime_ += 
deserializeTime; },
+      in);
+}
+
+VeloxShuffleReaderOutStreamWrapper::VeloxShuffleReaderOutStreamWrapper(
+    const std::shared_ptr<facebook::velox::memory::MemoryPool>& veloxPool,
+    const RowTypePtr& rowType,
+    int32_t batchSize,
+    facebook::velox::common::CompressionKind veloxCompressionType,
+    const std::function<void(int64_t)> decompressionTimeAccumulator,
+    const std::function<void(int64_t)> deserializeTimeAccumulator,
+    const std::shared_ptr<arrow::io::InputStream> in)
+    : veloxPool_(veloxPool),
+      rowType_(rowType),
+      batchSize_(batchSize),
+      veloxCompressionType_(veloxCompressionType),
+      decompressionTimeAccumulator_(decompressionTimeAccumulator),
+      deserializeTimeAccumulator_(deserializeTimeAccumulator) {
+  constexpr uint64_t kMaxReadBufferSize = (1 << 20) - 
AlignedBuffer::kPaddedSize;
+  auto buffer = AlignedBuffer::allocate<char>(kMaxReadBufferSize, 
veloxPool_.get());
+  in_ = std::make_unique<VeloxInputStream>(std::move(in), std::move(buffer));
+  serdeOptions_ = {false, veloxCompressionType_};
+  RowVectorPtr rowVector;
+}
+
+std::shared_ptr<ColumnarBatch> VeloxShuffleReaderOutStreamWrapper::next() {
+  if (!in_->hasNext()) {
+    return nullptr;
+  }
+
+  RowVectorPtr rowVector;
+  VectorStreamGroup::read(in_.get(), veloxPool_.get(), rowType_, &rowVector, 
&serdeOptions_);
+
+  if (rowVector->size() >= batchSize_) {
+    return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
+  }
+
+  while (rowVector->size() < batchSize_ && in_->hasNext()) {
+    RowVectorPtr rowVectorTemp;
+    VectorStreamGroup::read(in_.get(), veloxPool_.get(), rowType_, 
&rowVectorTemp, &serdeOptions_);
+    rowVector->append(rowVectorTemp.get());
+  }
+
+  int64_t decompressTime = 0LL;
+  int64_t deserializeTime = 0LL;
+
+  decompressionTimeAccumulator_(decompressTime);
+  deserializeTimeAccumulator_(deserializeTime);
+  return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
 }
 
 arrow::MemoryPool* VeloxColumnarBatchDeserializerFactory::getPool() {
   return memoryPool_;
 }
 
+ShuffleWriterType 
VeloxColumnarBatchDeserializerFactory::getShuffleWriterType() {
+  return shuffleWriterType_;
+}
+
 int64_t VeloxColumnarBatchDeserializerFactory::getDecompressTime() {
   return decompressTime_;
 }
@@ -440,4 +505,30 @@ void 
VeloxColumnarBatchDeserializerFactory::initFromSchema() {
     }
   }
 }
+
+VeloxInputStream::VeloxInputStream(std::shared_ptr<arrow::io::InputStream> 
input, facebook::velox::BufferPtr buffer)
+    : in_(std::move(input)), buffer_(std::move(buffer)) {
+  next(true);
+}
+
+bool VeloxInputStream::hasNext() {
+  if (offset_ == 0) {
+    return false;
+  }
+  if (ranges()[0].position >= ranges()[0].size) {
+    next(true);
+    return offset_ != 0;
+  }
+  return true;
+}
+
+void VeloxInputStream::next(bool throwIfPastEnd) {
+  const uint32_t readBytes = buffer_->capacity();
+  offset_ = in_->Read(readBytes, buffer_->asMutable<char>()).ValueOr(0);
+  if (offset_ > 0) {
+    int32_t realBytes = offset_;
+    VELOX_CHECK_LT(0, realBytes, "Reading past end of spill file");
+    setRange({buffer_->asMutable<uint8_t>(), realBytes, 0});
+  }
+}
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h 
b/cpp/velox/shuffle/VeloxShuffleReader.h
index 18df38006..3a0d8f9ff 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -22,6 +22,8 @@
 #include "velox/type/Type.h"
 #include "velox/vector/ComplexVector.h"
 
+#include <velox/serializers/PrestoSerializer.h>
+
 namespace gluten {
 
 class VeloxColumnarBatchDeserializer final : public ColumnarBatchIterator {
@@ -59,15 +61,57 @@ class VeloxColumnarBatchDeserializer final : public 
ColumnarBatchIterator {
   bool reachEos_{false};
 };
 
+class VeloxInputStream : public facebook::velox::ByteInputStream {
+ public:
+  VeloxInputStream(std::shared_ptr<arrow::io::InputStream> input, 
facebook::velox::BufferPtr buffer);
+
+  bool hasNext();
+
+  void next(bool throwIfPastEnd) override;
+
+  std::shared_ptr<arrow::io::InputStream> in_;
+  const facebook::velox::BufferPtr buffer_;
+  uint64_t offset_ = -1;
+};
+
+class VeloxShuffleReaderOutStreamWrapper : public ColumnarBatchIterator {
+ public:
+  VeloxShuffleReaderOutStreamWrapper(
+      const std::shared_ptr<facebook::velox::memory::MemoryPool>& veloxPool,
+      const facebook::velox::RowTypePtr& rowType,
+      int32_t batchSize,
+      const facebook::velox::common::CompressionKind veloxCompressionType,
+      const std::function<void(int64_t)> decompressionTimeAccumulator,
+      const std::function<void(int64_t)> deserializeTimeAccumulator,
+      const std::shared_ptr<arrow::io::InputStream> in);
+
+  std::shared_ptr<ColumnarBatch> next();
+
+ private:
+  std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
+  facebook::velox::RowTypePtr rowType_;
+  std::vector<facebook::velox::RowVectorPtr> batches_;
+  bool reachEos_{false};
+  int32_t rowCount_;
+  int32_t batchSize_;
+  facebook::velox::common::CompressionKind veloxCompressionType_;
+  facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions 
serdeOptions_;
+  std::function<void(int64_t)> decompressionTimeAccumulator_;
+  std::function<void(int64_t)> deserializeTimeAccumulator_;
+  std::shared_ptr<VeloxInputStream> in_;
+};
+
 class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory {
  public:
   VeloxColumnarBatchDeserializerFactory(
       const std::shared_ptr<arrow::Schema>& schema,
       const std::shared_ptr<arrow::util::Codec>& codec,
+      const facebook::velox::common::CompressionKind veloxCompressionType,
       const facebook::velox::RowTypePtr& rowType,
       int32_t batchSize,
       arrow::MemoryPool* memoryPool,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool);
+      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+      ShuffleWriterType shuffleWriterType);
 
   std::unique_ptr<ColumnarBatchIterator> 
createDeserializer(std::shared_ptr<arrow::io::InputStream> in) override;
 
@@ -77,9 +121,12 @@ class VeloxColumnarBatchDeserializerFactory : public 
DeserializerFactory {
 
   int64_t getDeserializeTime() override;
 
+  ShuffleWriterType getShuffleWriterType() override;
+
  private:
   std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::util::Codec> codec_;
+  facebook::velox::common::CompressionKind veloxCompressionType_;
   facebook::velox::RowTypePtr rowType_;
   int32_t batchSize_;
   arrow::MemoryPool* memoryPool_;
@@ -88,6 +135,8 @@ class VeloxColumnarBatchDeserializerFactory : public 
DeserializerFactory {
   std::vector<bool> isValidityBuffer_;
   bool hasComplexType_{false};
 
+  ShuffleWriterType shuffleWriterType_;
+
   int64_t deserializeTime_{0};
   int64_t decompressTime_{0};
 
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h 
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index e699a323b..104b87616 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -46,157 +46,60 @@
 
 namespace gluten {
 
-// set 1 to open print
-#define VELOX_SHUFFLE_WRITER_PRINT 0
-
-#if VELOX_SHUFFLE_WRITER_PRINT
-
-#define VsPrint Print
-#define VsPrintLF PrintLF
-#define VsPrintSplit PrintSplit
-#define VsPrintSplitLF PrintSplitLF
-#define VsPrintVectorRange PrintVectorRange
-#define VS_PRINT PRINT
-#define VS_PRINTLF PRINTLF
-#define VS_PRINT_FUNCTION_NAME PRINT_FUNCTION_NAME
-#define VS_PRINT_FUNCTION_SPLIT_LINE PRINT_FUNCTION_SPLIT_LINE
-#define VS_PRINT_CONTAINER PRINT_CONTAINER
-#define VS_PRINT_CONTAINER_TO_STRING PRINT_CONTAINER_TO_STRING
-#define VS_PRINT_CONTAINER_2_STRING PRINT_CONTAINER_2_STRING
-#define VS_PRINT_VECTOR_TO_STRING PRINT_VECTOR_TO_STRING
-#define VS_PRINT_VECTOR_2_STRING PRINT_VECTOR_2_STRING
-#define VS_PRINT_VECTOR_MAPPING PRINT_VECTOR_MAPPING
-
-#else // VELOX_SHUFFLE_WRITER_PRINT
-
-#define VsPrint(...) // NOLINT
-#define VsPrintLF(...) // NOLINT
-#define VsPrintSplit(...) // NOLINT
-#define VsPrintSplitLF(...) // NOLINT
-#define VsPrintVectorRange(...) // NOLINT
-#define VS_PRINT(a)
-#define VS_PRINTLF(a)
-#define VS_PRINT_FUNCTION_NAME()
-#define VS_PRINT_FUNCTION_SPLIT_LINE()
-#define VS_PRINT_CONTAINER(c)
-#define VS_PRINT_CONTAINER_TO_STRING(c)
-#define VS_PRINT_CONTAINER_2_STRING(c)
-#define VS_PRINT_VECTOR_TO_STRING(v)
-#define VS_PRINT_VECTOR_2_STRING(v)
-#define VS_PRINT_VECTOR_MAPPING(v)
-
-#endif // end of VELOX_SHUFFLE_WRITER_PRINT
-
-enum SplitState { kInit, kPreAlloc, kSplit, kStop };
-enum EvictState { kEvictable, kUnevictable };
-
-struct BinaryArrayResizeState {
-  bool inResize;
-  uint32_t partitionId;
-  uint32_t binaryIdx;
-
-  BinaryArrayResizeState() : inResize(false) {}
-  BinaryArrayResizeState(uint32_t partitionId, uint32_t binaryIdx)
-      : inResize(false), partitionId(partitionId), binaryIdx(binaryIdx) {}
-};
-
-class VeloxShuffleWriter final : public ShuffleWriter {
-  enum {
-    kValidityBufferIndex = 0,
-    kFixedWidthValueBufferIndex = 1,
-    kBinaryValueBufferIndex = 2,
-    kBinaryLengthBufferIndex = kFixedWidthValueBufferIndex
-  };
-
+class VeloxShuffleWriter : public ShuffleWriter {
  public:
-  struct BinaryBuf {
-    BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacityIn, 
uint64_t valueOffsetIn)
-        : valuePtr(value), lengthPtr(length), valueCapacity(valueCapacityIn), 
valueOffset(valueOffsetIn) {}
-
-    BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacity) : 
BinaryBuf(value, length, valueCapacity, 0) {}
-
-    BinaryBuf() : BinaryBuf(nullptr, nullptr, 0) {}
-
-    uint8_t* valuePtr;
-    uint8_t* lengthPtr;
-    uint64_t valueCapacity;
-    uint64_t valueOffset;
-  };
-
-  static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
-      uint32_t numPartitions,
-      std::unique_ptr<PartitionWriter> partitionWriter,
-      ShuffleWriterOptions options,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      arrow::MemoryPool* arrowPool);
-
-  arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) 
override;
-
-  arrow::Status stop() override;
-
-  arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
-
-  const uint64_t cachedPayloadSize() const override;
-
-  arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers);
-
-  int64_t rawPartitionBytes() const;
-
-  // For test only.
-  void setPartitionBufferSize(uint32_t newSize);
+  facebook::velox::RowVectorPtr getStrippedRowVector(const 
facebook::velox::RowVector& rv) {
+    // get new row type
+    auto rowType = rv.type()->asRow();
+    auto typeChildren = rowType.children();
+    typeChildren.erase(typeChildren.begin());
+    auto newRowType = facebook::velox::ROW(std::move(typeChildren));
+
+    // get length
+    auto length = rv.size();
+
+    // get children
+    auto children = rv.children();
+    children.erase(children.begin());
+
+    return std::make_shared<facebook::velox::RowVector>(
+        rv.pool(), newRowType, facebook::velox::BufferPtr(nullptr), length, 
std::move(children));
+  }
 
-  // for debugging
-  void printColumnsInfo() const {
-    VS_PRINT_FUNCTION_SPLIT_LINE();
-    VS_PRINTLF(fixed_width_column_count_);
+  const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) {
+    VELOX_CHECK(rv.childrenSize() > 0, "RowVector missing partition id 
column.");
 
-    VS_PRINT_CONTAINER(simple_column_indices_);
-    VS_PRINT_CONTAINER(binary_column_indices_);
-    VS_PRINT_CONTAINER(complex_column_indices_);
+    auto& firstChild = rv.childAt(0);
+    VELOX_CHECK(firstChild->isFlatEncoding(), "Partition id (field 0) is not 
flat encoding.");
+    VELOX_CHECK(
+        firstChild->type()->isInteger(),
+        "Partition id (field 0) should be integer, but got {}",
+        firstChild->type()->toString());
 
-    VS_PRINT_VECTOR_2_STRING(velox_column_types_);
-    VS_PRINT_VECTOR_TO_STRING(arrow_column_types_);
+    // first column is partition key hash value or pid
+    return firstChild->asFlatVector<int32_t>()->rawValues();
   }
 
-  void printPartition() const {
-    VS_PRINT_FUNCTION_SPLIT_LINE();
-    // row ID -> partition ID
-    VS_PRINT_VECTOR_MAPPING(row_2_partition_);
-
-    // partition -> row count
-    VS_PRINT_VECTOR_MAPPING(partition_2_row_count_);
-  }
+  // For test only.
+  virtual void setPartitionBufferSize(uint32_t newSize) {}
 
-  void printPartitionBuffer() const {
-    VS_PRINT_FUNCTION_SPLIT_LINE();
-    VS_PRINT_VECTOR_MAPPING(partition_2_buffer_size_);
-    VS_PRINT_VECTOR_MAPPING(partitionBufferBase_);
+  virtual arrow::Status evictPartitionBuffers(uint32_t partitionId, bool 
reuseBuffers) {
+    return arrow::Status::OK();
   }
 
-  void printPartition2Row() const {
-    VS_PRINT_FUNCTION_SPLIT_LINE();
-    VS_PRINT_VECTOR_MAPPING(partition2RowOffsetBase_);
-
-#if VELOX_SHUFFLE_WRITER_PRINT
-    for (auto pid = 0; pid < numPartitions_; ++pid) {
-      auto begin = partition2RowOffsetBase_[pid];
-      auto end = partition2RowOffsetBase_[pid + 1];
-      VsPrint("partition", pid);
-      VsPrintVectorRange(rowOffset2RowId_, begin, end);
-    }
-#endif
+  virtual arrow::Status evictRowVector(uint32_t partitionId) {
+    return arrow::Status::OK();
   }
 
-  void printInputHasNull() const {
-    VS_PRINT_FUNCTION_SPLIT_LINE();
-    VS_PRINT_CONTAINER(input_has_null_);
+  virtual const uint64_t cachedPayloadSize() const {
+    return 0;
   }
 
   int32_t maxBatchSize() const {
     return maxBatchSize_;
   }
 
- private:
+ protected:
   VeloxShuffleWriter(
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
@@ -209,216 +112,19 @@ class VeloxShuffleWriter final : public ShuffleWriter {
     serdeOptions_.useLosslessTimestamp = true;
   }
 
-  arrow::Status init();
-
-  arrow::Status initPartitions();
-
-  arrow::Status initColumnTypes(const facebook::velox::RowVector& rv);
-
-  arrow::Status splitRowVector(const facebook::velox::RowVector& rv);
-
-  arrow::Status initFromRowVector(const facebook::velox::RowVector& rv);
-
-  arrow::Status buildPartition2Row(uint32_t rowNum);
-
-  arrow::Status updateInputHasNull(const facebook::velox::RowVector& rv);
-
-  void setSplitState(SplitState state);
-
-  arrow::Status doSplit(const facebook::velox::RowVector& rv, int64_t 
memLimit);
-
-  bool beyondThreshold(uint32_t partitionId, uint32_t newSize);
-
-  uint32_t calculatePartitionBufferSize(const facebook::velox::RowVector& rv, 
int64_t memLimit);
-
-  arrow::Status preAllocPartitionBuffers(uint32_t preAllocBufferSize);
-
-  arrow::Status updateValidityBuffers(uint32_t partitionId, uint32_t newSize);
-
-  arrow::Result<std::shared_ptr<arrow::ResizableBuffer>>
-  allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint32_t newSize);
-
-  arrow::Status allocatePartitionBuffer(uint32_t partitionId, uint32_t 
newSize);
-
-  arrow::Status splitFixedWidthValueBuffer(const facebook::velox::RowVector& 
rv);
-
-  arrow::Status splitBoolType(const uint8_t* srcAddr, const 
std::vector<uint8_t*>& dstAddrs);
-
-  arrow::Status splitValidityBuffer(const facebook::velox::RowVector& rv);
+  virtual ~VeloxShuffleWriter() = default;
 
-  arrow::Status splitBinaryArray(const facebook::velox::RowVector& rv);
-
-  arrow::Status splitComplexType(const facebook::velox::RowVector& rv);
-
-  arrow::Status evictBuffers(
-      uint32_t partitionId,
-      uint32_t numRows,
-      std::vector<std::shared_ptr<arrow::Buffer>> buffers,
-      bool reuseBuffers);
-
-  arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> 
assembleBuffers(uint32_t partitionId, bool reuseBuffers);
-
-  template <typename T>
-  arrow::Status splitFixedType(const uint8_t* srcAddr, const 
std::vector<uint8_t*>& dstAddrs) {
-    for (auto& pid : partitionUsed_) {
-      auto dstPidBase = (T*)(dstAddrs[pid] + partitionBufferBase_[pid] * 
sizeof(T));
-      auto pos = partition2RowOffsetBase_[pid];
-      auto end = partition2RowOffsetBase_[pid + 1];
-      for (; pos < end; ++pos) {
-        auto rowId = rowOffset2RowId_[pos];
-        *dstPidBase++ = reinterpret_cast<const T*>(srcAddr)[rowId]; // copy
-      }
-    }
-    return arrow::Status::OK();
-  }
-
-  arrow::Status splitBinaryType(
-      uint32_t binaryIdx,
-      const facebook::velox::FlatVector<facebook::velox::StringView>& src,
-      std::vector<BinaryBuf>& dst);
-
-  arrow::Result<int64_t> evictCachedPayload(int64_t size);
-
-  arrow::Result<std::shared_ptr<arrow::Buffer>> 
generateComplexTypeBuffers(facebook::velox::RowVectorPtr vector);
-
-  arrow::Status resetValidityBuffer(uint32_t partitionId);
-
-  arrow::Result<int64_t> shrinkPartitionBuffersMinSize(int64_t size);
-
-  arrow::Result<int64_t> evictPartitionBuffersMinSize(int64_t size);
-
-  arrow::Status shrinkPartitionBuffer(uint32_t partitionId);
-
-  arrow::Status resetPartitionBuffer(uint32_t partitionId);
-
-  // Resize the partition buffer to newSize. If preserveData is true, it will 
keep the data in buffer.
-  // Note when preserveData is false, and newSize is larger, this function can 
introduce unnecessary memory copy.
-  // In this case, use allocatePartitionBuffer to free current buffers and 
allocate new buffers instead.
-  arrow::Status resizePartitionBuffer(uint32_t partitionId, uint32_t newSize, 
bool preserveData);
-
-  uint64_t valueBufferSizeForBinaryArray(uint32_t binaryIdx, uint32_t newSize);
-
-  uint64_t valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex, 
uint32_t newSize);
-
-  void calculateSimpleColumnBytes();
-
-  void stat() const;
-
-  bool shrinkPartitionBuffersAfterSpill() const;
-
-  bool evictPartitionBuffersAfterSpill() const;
-
-  arrow::Result<uint32_t> partitionBufferSizeAfterShrink(uint32_t partitionId) 
const;
-
-  bool isExtremelyLargeBatch(facebook::velox::RowVectorPtr& rv) const;
-
-  arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, 
int64_t memLimit);
-
-  SplitState splitState_{kInit};
+  std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;
 
-  EvictState evictState_{kEvictable};
+  facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions 
serdeOptions_;
 
-  BinaryArrayResizeState binaryArrayResizeState_{};
+  std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
 
   bool supportAvx512_ = false;
 
-  bool hasComplexType_ = false;
-  std::vector<bool> isValidityBuffer_;
-
-  // Store arrow column types. Calculated once.
-  std::vector<std::shared_ptr<arrow::DataType>> arrowColumnTypes_;
-
-  // Store velox column types. Calculated once.
-  std::vector<std::shared_ptr<const facebook::velox::Type>> veloxColumnTypes_;
-
-  // How many fixed-width columns in the schema. Calculated once.
-  uint32_t fixedWidthColumnCount_ = 0;
-
-  // The column indices of all binary types in the schema.
-  std::vector<uint32_t> binaryColumnIndices_;
-
-  // The column indices of all fixed-width and binary columns in the schema.
-  std::vector<uint32_t> simpleColumnIndices_;
-
-  // The column indices of all complex types in the schema, including Struct, 
Map, List columns.
-  std::vector<uint32_t> complexColumnIndices_;
-
-  // Total bytes of fixed-width buffers of all simple columns. Including 
validity buffers, value buffers of
-  // fixed-width types and length buffers of binary types.
-  // Used for estimating pre-allocated partition buffer size. Calculated once.
-  uint32_t fixedWidthBufferBytes_ = 0;
-
-  // Used for calculating the average binary length.
-  // Updated for each input RowVector.
-  uint64_t totalInputNumRows_ = 0;
-  std::vector<uint64_t> binaryArrayTotalSizeBytes_;
-
-  // True if input column has null in any processed input RowVector.
-  // In the order of fixed-width columns + binary columns.
-  std::vector<bool> inputHasNull_;
-
-  // Records which partitions are actually occurred in the current input 
RowVector.
-  // Most of the loops can loop on this array to avoid visiting unused 
partition id.
-  std::vector<uint32_t> partitionUsed_;
-
-  // Row ID -> Partition ID
-  // subscript: The index of row in the current input RowVector
-  // value: Partition ID
-  // Updated for each input RowVector.
-  std::vector<uint32_t> row2Partition_;
-
-  // Partition ID -> Row Count
-  // subscript: Partition ID
-  // value: How many rows does this partition have in the current input 
RowVector
-  // Updated for each input RowVector.
-  std::vector<uint32_t> partition2RowCount_;
-
-  // Note: partition2RowOffsetBase_ and rowOffset2RowId_ are the optimization 
of flattening the 2-dimensional vector
-  // into single dimension.
-  // The first dimension is the partition id. The second dimension is the ith 
occurrence of this partition in the
-  // input RowVector. The value is the index of the row in the input RowVector.
-  // partition2RowOffsetBase_ records the offset of the first dimension.
-  //
-  // The index of the ith occurrence of a give partition `pid` in the input 
RowVector can be calculated via
-  // rowOffset2RowId_[partition2RowOffsetBase_[pid] + i]
-  // i is in the range of [0, partition2RowCount_[pid])
-
-  // Partition ID -> Row offset, elements num: Partition num + 1
-  // subscript: Partition ID
-  // value: The base row offset of this Partition
-  // Updated for each input RowVector.
-  std::vector<uint32_t> partition2RowOffsetBase_;
-
-  // Row offset -> Source row ID, elements num: input RowVector row num
-  // subscript: Row offset
-  // value: The index of row in the current input RowVector
-  // Updated for each input RowVector.
-  std::vector<uint32_t> rowOffset2RowId_;
-
-  // Partition buffers are used for holding the intermediate data during split.
-  // Partition ID -> Partition buffer size(unit is row)
-  std::vector<uint32_t> partitionBufferSize_;
-
-  // The write position of partition buffer. Updated after split. Reset when 
partition buffers are reallocated.
-  std::vector<uint32_t> partitionBufferBase_;
-
-  // Used by all simple types. Stores raw pointers of partition buffers.
-  std::vector<std::vector<uint8_t*>> partitionValidityAddrs_;
-  // Used by fixed-width types. Stores raw pointers of partition buffers.
-  std::vector<std::vector<uint8_t*>> partitionFixedWidthValueAddrs_;
-  // Used by binary types. Stores raw pointers and metadata of partition 
buffers.
-  std::vector<std::vector<BinaryBuf>> partitionBinaryAddrs_;
-
-  // Used by complex types.
-  // Partition id -> Serialized complex data.
-  std::vector<std::unique_ptr<facebook::velox::IterativeVectorSerializer>> 
complexTypeData_;
-  std::vector<std::shared_ptr<arrow::ResizableBuffer>> complexTypeFlushBuffer_;
-  std::shared_ptr<const facebook::velox::RowType> complexWriteType_;
+  int32_t maxBatchSize_{0};
 
-  std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
-  std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;
-  facebook::velox::serializer::presto::PrestoVectorSerde serde_;
-  facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions 
serdeOptions_;
+  enum EvictState { kEvictable, kUnevictable };
 
   // stat
   enum CpuWallTimingType {
@@ -474,7 +180,28 @@ class VeloxShuffleWriter final : public ShuffleWriter {
   }
 
   facebook::velox::CpuWallTiming cpuWallTimingList_[CpuWallTimingNum];
-  int32_t maxBatchSize_{0};
-}; // class VeloxShuffleWriter
+
+  EvictState evictState_{kEvictable};
+
+  class EvictGuard {
+   public:
+    explicit EvictGuard(EvictState& evictState) : evictState_(evictState) {
+      evictState_ = EvictState::kUnevictable;
+    }
+
+    ~EvictGuard() {
+      evictState_ = EvictState::kEvictable;
+    }
+
+    // For safety and clarity.
+    EvictGuard(const EvictGuard&) = delete;
+    EvictGuard& operator=(const EvictGuard&) = delete;
+    EvictGuard(EvictGuard&&) = delete;
+    EvictGuard& operator=(EvictGuard&&) = delete;
+
+   private:
+    EvictState& evictState_;
+  };
+};
 
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
new file mode 100644
index 000000000..b0c2cc8ad
--- /dev/null
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "VeloxSortBasedShuffleWriter.h"
+#include "memory/ArrowMemory.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "memory/VeloxMemoryManager.h"
+#include "shuffle/ShuffleSchema.h"
+#include "utils/Common.h"
+#include "utils/VeloxArrowUtils.h"
+#include "utils/macros.h"
+#include "velox/common/base/Nulls.h"
+#include "velox/type/Type.h"
+#include "velox/vector/ComplexVector.h"
+
+#if defined(__x86_64__)
+#include <immintrin.h>
+#include <x86intrin.h>
+#elif defined(__aarch64__)
+#include <arm_neon.h>
+#endif
+
+namespace gluten {
+
+#define VELOX_SHUFFLE_WRITER_LOG_FLAG 0
+
+// macro to rotate left an 8-bit value 'x' given the shift 's' is a 32-bit 
integer
+// (x is left shifted by 's' modulo 8) OR (x right shifted by (8 - 's' modulo 
8))
+#if !defined(__x86_64__)
+#define rotateLeft(x, s) (x << (s - ((s >> 3) << 3)) | x >> (8 - (s - ((s >> 
3) << 3))))
+#endif
+
+// on x86 machines, _MM_HINT_T0,T1,T2 are defined as 1, 2, 3
+// equivalent mapping to __builtin_prefetch hints is 3, 2, 1
+#if defined(__x86_64__)
+#define PREFETCHT0(ptr) _mm_prefetch(ptr, _MM_HINT_T0)
+#define PREFETCHT1(ptr) _mm_prefetch(ptr, _MM_HINT_T1)
+#define PREFETCHT2(ptr) _mm_prefetch(ptr, _MM_HINT_T2)
+#else
+#define PREFETCHT0(ptr) __builtin_prefetch(ptr, 0, 3)
+#define PREFETCHT1(ptr) __builtin_prefetch(ptr, 0, 2)
+#define PREFETCHT2(ptr) __builtin_prefetch(ptr, 0, 1)
+#endif
+
+arrow::Result<std::shared_ptr<VeloxShuffleWriter>> 
VeloxSortBasedShuffleWriter::create(
+    uint32_t numPartitions,
+    std::unique_ptr<PartitionWriter> partitionWriter,
+    ShuffleWriterOptions options,
+    std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+    arrow::MemoryPool* arrowPool) {
+  std::shared_ptr<VeloxSortBasedShuffleWriter> res(new 
VeloxSortBasedShuffleWriter(
+      numPartitions, std::move(partitionWriter), std::move(options), 
veloxPool, arrowPool));
+  RETURN_NOT_OK(res->init());
+  return res;
+} // namespace gluten
+
+arrow::Status VeloxSortBasedShuffleWriter::init() {
+#if defined(__x86_64__)
+  supportAvx512_ = __builtin_cpu_supports("avx512bw");
+#else
+  supportAvx512_ = false;
+#endif
+
+  ARROW_ASSIGN_OR_RAISE(
+      partitioner_, Partitioner::make(options_.partitioning, numPartitions_, 
options_.startPartitionId));
+  DLOG(INFO) << "Create partitioning type: " << 
std::to_string(options_.partitioning);
+
+  partition2RowCount_.resize(numPartitions_);
+  rowVectorIndexMap_.reserve(numPartitions_);
+  for (auto pid = 0; pid < numPartitions_; ++pid) {
+    rowVectorIndexMap_[pid].reserve(options_.bufferSize);
+  }
+
+  return arrow::Status::OK();
+}
+
+arrow::Status 
VeloxSortBasedShuffleWriter::doSort(facebook::velox::RowVectorPtr rv, int64_t 
memLimit) {
+  currentInputColumnBytes_ += rv->estimateFlatSize();
+  batches_.push_back(rv);
+  if (currentInputColumnBytes_ > memLimit) {
+    for (auto pid = 0; pid < numPartitions(); ++pid) {
+      RETURN_NOT_OK(evictRowVector(pid));
+      partition2RowCount_[pid] = 0;
+    }
+    batches_.clear();
+    currentInputColumnBytes_ = 0;
+  }
+  setSortState(SortState::kSortInit);
+  return arrow::Status::OK();
+}
+
+arrow::Status 
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t 
memLimit) {
+  if (options_.partitioning == Partitioning::kSingle) {
+    auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
+    VELOX_CHECK_NOT_NULL(veloxColumnBatch);
+    auto rv = veloxColumnBatch->getFlattenedRowVector();
+    RETURN_NOT_OK(initFromRowVector(*rv.get()));
+    RETURN_NOT_OK(doSort(rv, 
partitionWriter_.get()->options().sortBufferMaxSize));
+  } else if (options_.partitioning == Partitioning::kRange) {
+    auto compositeBatch = 
std::dynamic_pointer_cast<CompositeColumnarBatch>(cb);
+    VELOX_CHECK_NOT_NULL(compositeBatch);
+    auto batches = compositeBatch->getBatches();
+    VELOX_CHECK_EQ(batches.size(), 2);
+    auto pidBatch = VeloxColumnarBatch::from(veloxPool_.get(), batches[0]);
+    auto pidArr = getFirstColumn(*(pidBatch->getRowVector()));
+    START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
+    setSortState(SortState::kSort);
+    RETURN_NOT_OK(partitioner_->compute(pidArr, pidBatch->numRows(), 
batches_.size(), rowVectorIndexMap_));
+    END_TIMING();
+    auto rvBatch = VeloxColumnarBatch::from(veloxPool_.get(), batches[1]);
+    auto rv = rvBatch->getFlattenedRowVector();
+    RETURN_NOT_OK(initFromRowVector(*rv.get()));
+    RETURN_NOT_OK(doSort(rv, 
partitionWriter_.get()->options().sortBufferMaxSize));
+  } else {
+    auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
+    VELOX_CHECK_NOT_NULL(veloxColumnBatch);
+    facebook::velox::RowVectorPtr rv;
+    START_TIMING(cpuWallTimingList_[CpuWallTimingFlattenRV]);
+    rv = veloxColumnBatch->getFlattenedRowVector();
+    END_TIMING();
+    if (partitioner_->hasPid()) {
+      auto pidArr = getFirstColumn(*rv);
+      START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
+      setSortState(SortState::kSort);
+      RETURN_NOT_OK(partitioner_->compute(pidArr, rv->size(), batches_.size(), 
rowVectorIndexMap_));
+      END_TIMING();
+      auto strippedRv = getStrippedRowVector(*rv);
+      RETURN_NOT_OK(initFromRowVector(*strippedRv));
+      RETURN_NOT_OK(doSort(strippedRv, 
partitionWriter_.get()->options().sortBufferMaxSize));
+    } else {
+      RETURN_NOT_OK(initFromRowVector(*rv));
+      START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
+      setSortState(SortState::kSort);
+      RETURN_NOT_OK(partitioner_->compute(nullptr, rv->size(), 
batches_.size(), rowVectorIndexMap_));
+      END_TIMING();
+      RETURN_NOT_OK(doSort(rv, 
partitionWriter_.get()->options().sortBufferMaxSize));
+    }
+  }
+  return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::evictBatch(
+    uint32_t partitionId,
+    std::ostringstream* output,
+    facebook::velox::OStreamOutputStream* out,
+    facebook::velox::RowTypePtr* rowTypePtr) {
+  int64_t rawSize = batch_->size();
+  batch_->flush(out);
+  const std::string& outputStr = output->str();
+  RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize, 
outputStr.c_str(), outputStr.size()));
+  batch_.reset();
+  output->clear();
+  output->str("");
+  batch_ = 
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), 
serde_.get());
+  batch_->createStreamTree(*rowTypePtr, options_.bufferSize, &serdeOptions_);
+  return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::evictRowVector(uint32_t 
partitionId) {
+  int32_t rowNum = 0;
+  const int32_t maxBatchNum = options_.bufferSize;
+  auto rowTypePtr = std::static_pointer_cast<const 
facebook::velox::RowType>(rowType_.value());
+  std::ostringstream output;
+  facebook::velox::OStreamOutputStream out(&output);
+
+  if (options_.partitioning != Partitioning::kSingle) {
+    if (auto it = rowVectorIndexMap_.find(partitionId); it != 
rowVectorIndexMap_.end()) {
+      auto rowVectorIndex = it->second;
+      const int32_t outputSize = rowVectorIndex.size();
+
+      std::map<int32_t, std::vector<facebook::velox::IndexRange>> 
groupedIndices;
+      std::map<int32_t, int64_t> groupedSize;
+
+      int32_t tempVectorIndex = -1;
+      int32_t baseRowIndex = -1;
+      int32_t tempRowIndex = -1;
+      int32_t size = 1;
+      for (int start = 0; start < outputSize; start++) {
+        const int64_t rowVector = rowVectorIndex[start];
+        const int32_t vectorIndex = static_cast<int32_t>(rowVector >> 32);
+        const int32_t rowIndex = static_cast<int32_t>(rowVector & 
0xFFFFFFFFLL);
+        if (tempVectorIndex == -1) {
+          tempVectorIndex = vectorIndex;
+          baseRowIndex = rowIndex;
+          tempRowIndex = rowIndex;
+        } else {
+          if (vectorIndex == tempVectorIndex && rowIndex == tempRowIndex + 1) {
+            size += 1;
+            tempRowIndex = rowIndex;
+          } else {
+            groupedIndices[tempVectorIndex].push_back({baseRowIndex, size});
+            groupedSize[tempVectorIndex] += size;
+            size = 1;
+            tempVectorIndex = vectorIndex;
+            baseRowIndex = rowIndex;
+            tempRowIndex = rowIndex;
+          }
+        }
+      }
+      groupedIndices[tempVectorIndex].push_back({baseRowIndex, size});
+      groupedSize[tempVectorIndex] += size;
+
+      for (auto& pair : groupedIndices) {
+        batch_->append(batches_[pair.first], pair.second);
+        rowNum += groupedSize[pair.first];
+        if (rowNum >= maxBatchNum) {
+          rowNum = 0;
+          RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+        }
+      }
+
+      rowVectorIndex.clear();
+      rowVectorIndexMap_.erase(partitionId);
+    }
+  } else {
+    for (facebook::velox::RowVectorPtr rowVectorPtr : batches_) {
+      rowNum += rowVectorPtr->size();
+      batch_->append(rowVectorPtr);
+      if (rowNum >= maxBatchNum) {
+        RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+        rowNum = 0;
+      }
+    }
+  }
+  if (rowNum > 0) {
+    RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+  }
+  return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::stop() {
+  for (auto pid = 0; pid < numPartitions(); ++pid) {
+    RETURN_NOT_OK(evictRowVector(pid));
+    partition2RowCount_[pid] = 0;
+  }
+  batches_.clear();
+  currentInputColumnBytes_ = 0;
+  {
+    SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]);
+    setSortState(SortState::kSortStop);
+    RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
+    partitionBuffers_.clear();
+  }
+
+  stat();
+
+  return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::initFromRowVector(const 
facebook::velox::RowVector& rv) {
+  if (!rowType_.has_value()) {
+    rowType_ = rv.type();
+    serdeOptions_ = {
+        false, 
facebook::velox::common::stringToCompressionKind(partitionWriter_->options().compressionTypeStr)};
+    batch_ = 
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), 
serde_.get());
+    batch_->createStreamTree(
+        std::static_pointer_cast<const 
facebook::velox::RowType>(rowType_.value()),
+        options_.bufferSize,
+        &serdeOptions_);
+  }
+  return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::reclaimFixedSize(int64_t size, 
int64_t* actual) {
+  if (evictState_ == EvictState::kUnevictable) {
+    *actual = 0;
+    return arrow::Status::OK();
+  }
+  EvictGuard evictGuard{evictState_};
+
+  if (sortState_ == SortState::kSortInit) {
+    for (auto pid = 0; pid < numPartitions(); ++pid) {
+      RETURN_NOT_OK(evictRowVector(pid));
+      partition2RowCount_[pid] = 0;
+    }
+    batches_.clear();
+    *actual = currentInputColumnBytes_;
+    currentInputColumnBytes_ = 0;
+  }
+  return arrow::Status::OK();
+}
+
+void VeloxSortBasedShuffleWriter::stat() const {
+#if VELOX_SHUFFLE_WRITER_LOG_FLAG
+  for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) {
+    std::ostringstream oss;
+    auto& timing = cpuWallTimingList_[i];
+    oss << "Velox shuffle writer stat:" << 
CpuWallTimingName((CpuWallTimingType)i);
+    oss << " " << timing.toString();
+    if (timing.count > 0) {
+      oss << " wallNanos-avg:" << timing.wallNanos / timing.count;
+      oss << " cpuNanos-avg:" << timing.cpuNanos / timing.count;
+    }
+    LOG(INFO) << oss.str();
+  }
+#endif
+}
+
+void VeloxSortBasedShuffleWriter::setSortState(SortState state) {
+  sortState_ = state;
+}
+
+} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
new file mode 100644
index 000000000..e3ac07dfc
--- /dev/null
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "velox/common/time/CpuWallTimer.h"
+#include "velox/serializers/PrestoSerializer.h"
+#include "velox/type/Type.h"
+#include "velox/vector/ComplexVector.h"
+#include "velox/vector/FlatVector.h"
+#include "velox/vector/VectorStream.h"
+
+#include <arrow/array/util.h>
+#include <arrow/ipc/writer.h>
+#include <arrow/memory_pool.h>
+#include <arrow/record_batch.h>
+#include <arrow/result.h>
+#include <arrow/type.h>
+
+#include "VeloxShuffleWriter.h"
+#include "memory/VeloxMemoryManager.h"
+#include "shuffle/PartitionWriter.h"
+#include "shuffle/Partitioner.h"
+#include "shuffle/Utils.h"
+
+#include "utils/Print.h"
+
+namespace gluten {
+
+enum SortState { kSortInit, kSort, kSortStop };
+
+class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter {
+ public:
+  static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
+      uint32_t numPartitions,
+      std::unique_ptr<PartitionWriter> partitionWriter,
+      ShuffleWriterOptions options,
+      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+      arrow::MemoryPool* arrowPool);
+
+  arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) 
override;
+
+  arrow::Status stop() override;
+
+  arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
+
+  arrow::Status evictRowVector(uint32_t partitionId) override;
+
+  arrow::Status evictBatch(
+      uint32_t partitionId,
+      std::ostringstream* output,
+      facebook::velox::OStreamOutputStream* out,
+      facebook::velox::RowTypePtr* rowTypePtr);
+
+ private:
+  VeloxSortBasedShuffleWriter(
+      uint32_t numPartitions,
+      std::unique_ptr<PartitionWriter> partitionWriter,
+      ShuffleWriterOptions options,
+      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+      arrow::MemoryPool* pool)
+      : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), std::move(veloxPool), pool) {}
+
+  arrow::Status init();
+
+  arrow::Status initFromRowVector(const facebook::velox::RowVector& rv);
+
+  void setSortState(SortState state);
+
+  arrow::Status doSort(facebook::velox::RowVectorPtr rv, int64_t memLimit);
+
+  void stat() const;
+
+  std::optional<facebook::velox::TypePtr> rowType_;
+
+  std::unique_ptr<facebook::velox::VectorStreamGroup> batch_;
+
+  // Partition ID -> Row Count
+  // subscript: Partition ID
+  // value: How many rows does this partition have in the current input 
RowVector
+  // Updated for each input RowVector.
+  std::vector<uint32_t> partition2RowCount_;
+
+  std::unique_ptr<facebook::velox::serializer::presto::PrestoVectorSerde> 
serde_ =
+      
std::make_unique<facebook::velox::serializer::presto::PrestoVectorSerde>();
+
+  std::vector<facebook::velox::RowVectorPtr> batches_;
+
+  std::unordered_map<int32_t, std::vector<int64_t>> rowVectorIndexMap_;
+
+  std::unordered_map<int32_t, std::vector<int64_t>> rowVectorPartitionMap_;
+
+  uint32_t currentInputColumnBytes_ = 0;
+
+  SortState sortState_{kSortInit};
+}; // class VeloxSortBasedShuffleWriter
+
+} // namespace gluten
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index ffda945b1..fdf3e4491 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -19,7 +19,9 @@
 #include <arrow/io/api.h>
 
 #include "shuffle/LocalPartitionWriter.h"
+#include "shuffle/VeloxHashBasedShuffleWriter.h"
 #include "shuffle/VeloxShuffleWriter.h"
+#include "shuffle/VeloxSortBasedShuffleWriter.h"
 #include "shuffle/rss/RssPartitionWriter.h"
 #include "utils/TestUtils.h"
 #include "utils/VeloxArrowUtils.h"
@@ -68,12 +70,18 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
   std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
 
   for (const auto& compression : compressions) {
+    params.push_back(ShuffleTestParams{ShuffleWriterType::kSortShuffle, 
PartitionWriterType::kRss, compression, 0, 0});
     for (const auto compressionThreshold : compressionThresholds) {
       for (const auto mergeBufferSize : mergeBufferSizes) {
-        params.push_back(
-            ShuffleTestParams{PartitionWriterType::kLocal, compression, 
compressionThreshold, mergeBufferSize});
+        params.push_back(ShuffleTestParams{
+            ShuffleWriterType::kHashShuffle,
+            PartitionWriterType::kLocal,
+            compression,
+            compressionThreshold,
+            mergeBufferSize});
       }
-      params.push_back(ShuffleTestParams{PartitionWriterType::kRss, 
compression, compressionThreshold, 0});
+      params.push_back(ShuffleTestParams{
+          ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, 
compression, compressionThreshold, 0});
     }
   }
 
@@ -264,7 +272,9 @@ TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) {
   auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
   // calculate maxBatchSize_
   ASSERT_NOT_OK(splitRowVector(*shuffleWriter, hashInputVector1_));
-  VELOX_CHECK_EQ(shuffleWriter->maxBatchSize(), expectedMaxBatchSize);
+  if (GetParam().shuffleWriterType == kHashShuffle) {
+    VELOX_CHECK_EQ(shuffleWriter->maxBatchSize(), expectedMaxBatchSize);
+  }
 
   auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{1, 2, 3, 4, 8}, {0, 1}, {1, 2, 3, 4, 8}});
   auto blockPid1 = takeRows({inputVector1_}, {{0, 5, 6, 7, 9, 0, 5, 6, 7, 9}});
@@ -305,6 +315,9 @@ TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
 }
 
 TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) {
+  if (GetParam().shuffleWriterType == kSortShuffle) {
+    return;
+  }
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.bufferReallocThreshold = 0; // Force re-alloc on 
buffer size changed.
   auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
@@ -392,6 +405,9 @@ TEST_P(RoundRobinPartitioningShuffleWriter, 
preAllocForceReuse) {
 }
 
 TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) {
+  if (GetParam().shuffleWriterType == kSortShuffle) {
+    return;
+  }
   ASSERT_NOT_OK(initShuffleWriterOptions());
   auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
 
diff --git a/cpp/velox/utils/tests/LocalRssClient.h 
b/cpp/velox/utils/tests/LocalRssClient.h
index 0033526bb..c5c1b5d2c 100644
--- a/cpp/velox/utils/tests/LocalRssClient.h
+++ b/cpp/velox/utils/tests/LocalRssClient.h
@@ -30,7 +30,7 @@ class LocalRssClient : public RssClient {
  public:
   LocalRssClient(std::string dataFile) : dataFile_(dataFile) {}
 
-  int32_t pushPartitionData(int32_t partitionId, char* bytes, int64_t size) {
+  int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t 
size) {
     auto idx = -1;
     auto maybeIdx = partitionIdx_.find(partitionId);
     if (maybeIdx == partitionIdx_.end()) {
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h 
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index 972c0cb25..66732c97a 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -62,6 +62,7 @@ std::unique_ptr<PartitionWriter> createPartitionWriter(
 } // namespace
 
 struct ShuffleTestParams {
+  ShuffleWriterType shuffleWriterType;
   PartitionWriterType partitionWriterType;
   arrow::Compression::type compressionType;
   int32_t compressionThreshold;
@@ -69,8 +70,9 @@ struct ShuffleTestParams {
 
   std::string toString() const {
     std::ostringstream out;
-    out << "partitionWriterType = " << partitionWriterType << ", 
compressionType = " << compressionType
-        << ", compressionThreshold = " << compressionThreshold << ", 
mergeBufferSize = " << mergeBufferSize;
+    out << "shuffleWriterType = " << shuffleWriterType << ", 
partitionWriterType = " << partitionWriterType
+        << ", compressionType = " << compressionType << ", 
compressionThreshold = " << compressionThreshold
+        << ", mergeBufferSize = " << mergeBufferSize;
     return out.str();
   }
 };
@@ -179,7 +181,7 @@ class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase
 
   arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, 
facebook::velox::RowVectorPtr vector) {
     std::shared_ptr<ColumnarBatch> cb = 
std::make_shared<VeloxColumnarBatch>(vector);
-    return shuffleWriter.split(cb, ShuffleWriter::kMinMemLimit);
+    return shuffleWriter.write(cb, ShuffleWriter::kMinMemLimit);
   }
 
   // Create multiple local dirs and join with comma.
@@ -231,11 +233,47 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
 
     ShuffleTestParams params = GetParam();
     partitionWriterOptions_.compressionType = params.compressionType;
+    switch (partitionWriterOptions_.compressionType) {
+      case arrow::Compression::UNCOMPRESSED:
+        partitionWriterOptions_.compressionTypeStr = "none";
+        break;
+      case arrow::Compression::LZ4_FRAME:
+        partitionWriterOptions_.compressionTypeStr = "lz4";
+        break;
+      case arrow::Compression::ZSTD:
+        partitionWriterOptions_.compressionTypeStr = "zstd";
+        break;
+      default:
+        break;
+    };
     partitionWriterOptions_.compressionThreshold = params.compressionThreshold;
     partitionWriterOptions_.mergeBufferSize = params.mergeBufferSize;
     return arrow::Status::OK();
   }
 
+  std::shared_ptr<VeloxShuffleWriter> createSpecificShuffleWriter(
+      arrow::MemoryPool* arrowPool,
+      std::unique_ptr<PartitionWriter> partitionWriter,
+      ShuffleWriterOptions shuffleWriterOptions,
+      uint32_t numPartitions,
+      int32_t bufferSize) {
+    std::shared_ptr<VeloxShuffleWriter> shuffleWriter;
+    if (GetParam().shuffleWriterType == kHashShuffle) {
+      shuffleWriterOptions.bufferSize = bufferSize;
+      GLUTEN_ASSIGN_OR_THROW(
+          shuffleWriter,
+          VeloxHashBasedShuffleWriter::create(
+              numPartitions, std::move(partitionWriter), 
std::move(shuffleWriterOptions), pool_, arrowPool));
+    } else if (
+        GetParam().shuffleWriterType == kSortShuffle && 
GetParam().partitionWriterType == PartitionWriterType::kRss) {
+      GLUTEN_ASSIGN_OR_THROW(
+          shuffleWriter,
+          VeloxSortBasedShuffleWriter::create(
+              numPartitions, std::move(partitionWriter), 
std::move(shuffleWriterOptions), pool_, arrowPool));
+    }
+    return shuffleWriter;
+  }
+
  protected:
   static void SetUpTestCase() {
     facebook::velox::memory::MemoryManager::testingSetInstance({});
@@ -276,9 +314,33 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
     options.compressionType = compressionType;
     auto codec = createArrowIpcCodec(options.compressionType, 
CodecBackend::NONE);
     auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema));
+    switch (options.compressionType) {
+      case arrow::Compression::type::UNCOMPRESSED:
+        options.compressionTypeStr = "none";
+        break;
+      case arrow::Compression::type::LZ4_FRAME:
+        options.compressionTypeStr = "lz4";
+        break;
+      case arrow::Compression::type::ZSTD:
+        options.compressionTypeStr = "zstd";
+        break;
+      default:
+        break;
+    };
+    auto veloxCompressionType = 
facebook::velox::common::stringToCompressionKind(options.compressionTypeStr);
+    if (!facebook::velox::isRegisteredVectorSerde()) {
+      
facebook::velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
+    }
     // Set batchSize to a large value to make all batches are merged by reader.
     auto deserializerFactory = 
std::make_unique<gluten::VeloxColumnarBatchDeserializerFactory>(
-        schema, std::move(codec), rowType, 
std::numeric_limits<int32_t>::max(), defaultArrowMemoryPool().get(), pool_);
+        schema,
+        std::move(codec),
+        veloxCompressionType,
+        rowType,
+        std::numeric_limits<int32_t>::max(),
+        defaultArrowMemoryPool().get(),
+        pool_,
+        GetParam().shuffleWriterType);
     auto reader = 
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
     auto iter = reader->readStream(in);
     while (iter->hasNext()) {
@@ -316,15 +378,12 @@ class SinglePartitioningShuffleWriter : public 
VeloxShuffleWriterTest {
   }
 
   std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* 
arrowPool) override {
-    shuffleWriterOptions_.bufferSize = 10;
     shuffleWriterOptions_.partitioning = Partitioning::kSingle;
     static const uint32_t kNumPartitions = 1;
     auto partitionWriter = createPartitionWriter(
         GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
-    GLUTEN_ASSIGN_OR_THROW(
-        auto shuffleWriter,
-        VeloxShuffleWriter::create(
-            kNumPartitions, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), pool_, arrowPool));
+    std::shared_ptr<VeloxShuffleWriter> shuffleWriter = 
createSpecificShuffleWriter(
+        arrowPool, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), kNumPartitions, 10);
     return shuffleWriter;
   }
 };
@@ -387,15 +446,12 @@ class HashPartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
   }
 
   std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* 
arrowPool) override {
-    shuffleWriterOptions_.bufferSize = 4;
     shuffleWriterOptions_.partitioning = Partitioning::kHash;
     static const uint32_t kNumPartitions = 2;
     auto partitionWriter = createPartitionWriter(
         GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
-    GLUTEN_ASSIGN_OR_THROW(
-        auto shuffleWriter,
-        VeloxShuffleWriter::create(
-            kNumPartitions, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), pool_, arrowPool));
+    std::shared_ptr<VeloxShuffleWriter> shuffleWriter = 
createSpecificShuffleWriter(
+        arrowPool, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), kNumPartitions, 4);
     return shuffleWriter;
   }
 
@@ -422,15 +478,12 @@ class RangePartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter
   }
 
   std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* 
arrowPool) override {
-    shuffleWriterOptions_.bufferSize = 4;
     shuffleWriterOptions_.partitioning = Partitioning::kRange;
     static const uint32_t kNumPartitions = 2;
     auto partitionWriter = createPartitionWriter(
         GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
-    GLUTEN_ASSIGN_OR_THROW(
-        auto shuffleWriter,
-        VeloxShuffleWriter::create(
-            kNumPartitions, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), pool_, arrowPool));
+    std::shared_ptr<VeloxShuffleWriter> shuffleWriter = 
createSpecificShuffleWriter(
+        arrowPool, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), kNumPartitions, 4);
     return shuffleWriter;
   }
 
@@ -441,7 +494,7 @@ class RangePartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter
       facebook::velox::TypePtr dataType,
       std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{ /* blockId = pid, rowVector in block */
     for (auto& batch : batches) {
-      ASSERT_NOT_OK(shuffleWriter.split(batch, ShuffleWriter::kMinMemLimit));
+      ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit));
     }
     shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength, 
dataType, expectedVectors);
   }
@@ -453,14 +506,11 @@ class RangePartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter
 class RoundRobinPartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
  protected:
   std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* 
arrowPool) override {
-    shuffleWriterOptions_.bufferSize = 4;
     static const uint32_t kNumPartitions = 2;
     auto partitionWriter = createPartitionWriter(
         GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
-    GLUTEN_ASSIGN_OR_THROW(
-        auto shuffleWriter,
-        VeloxShuffleWriter::create(
-            kNumPartitions, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), pool_, arrowPool));
+    std::shared_ptr<VeloxShuffleWriter> shuffleWriter = 
createSpecificShuffleWriter(
+        arrowPool, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), kNumPartitions, 4);
     return shuffleWriter;
   }
 };
@@ -479,7 +529,7 @@ class VeloxShuffleWriterMemoryTest : public 
VeloxShuffleWriterTestBase, public t
         PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
     GLUTEN_ASSIGN_OR_THROW(
         auto shuffleWriter,
-        VeloxShuffleWriter::create(
+        VeloxHashBasedShuffleWriter::create(
             numPartitions, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), pool_, arrowPool));
     return shuffleWriter;
   }
diff --git 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index a1a41f973..f454cf00c 100644
--- 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++ 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Iterators;
 import org.apache.celeborn.client.LifecycleManager;
 import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.protocol.ShuffleMode;
 import org.apache.spark.*;
 import org.apache.spark.shuffle.*;
 import org.apache.spark.shuffle.celeborn.*;
@@ -291,10 +290,6 @@ public class CelebornShuffleManager implements 
ShuffleManager {
           shuffleId = h.dependency().shuffleId();
         }
 
-        if (!ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
-          throw new UnsupportedOperationException(
-              "Unrecognized shuffle write mode!" + 
celebornConf.shuffleWriterMode());
-        }
         if (h.dependency() instanceof ColumnarShuffleDependency) {
           // columnar-based shuffle
           return writerFactory.createShuffleWriterInstance(
diff --git 
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
 
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
index 292ff3cc1..efd891498 100644
--- 
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
@@ -29,6 +29,7 @@ import org.apache.celeborn.client.ShuffleClient
 import org.apache.celeborn.common.CelebornConf
 
 import java.io.IOException
+import java.util.Locale
 
 abstract class CelebornHashBasedColumnarShuffleWriter[K, V](
     shuffleId: Int,
@@ -53,6 +54,13 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](
 
   protected val clientPushBufferMaxSize: Int = 
celebornConf.clientPushBufferMaxSize
 
+  protected val clientPushSortMemoryThreshold: Long = 
celebornConf.clientPushSortMemoryThreshold
+
+  protected val clientSortMemoryMaxSize: Long = 
celebornConf.clientPushSortMemoryThreshold
+
+  protected val shuffleWriterType: String =
+    celebornConf.shuffleWriterMode.name.toLowerCase(Locale.ROOT)
+
   protected val celebornPartitionPusher = new CelebornPartitionPusher(
     shuffleId,
     numMappers,
diff --git 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index d72977f59..699626db1 100644
--- 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++ 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -39,6 +39,7 @@ import org.apache.celeborn.client.read.CelebornInputStream
 
 import java.io._
 import java.nio.ByteBuffer
+import java.util.Locale
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicBoolean
 
@@ -83,6 +84,8 @@ private class CelebornColumnarBatchSerializerInstance(
       }
     val compressionCodecBackend =
       GlutenConfig.getConf.columnarShuffleCodecBackend.orNull
+    val shuffleWriterType =
+      conf.get("spark.celeborn.client.spark.shuffle.writer", 
"hash").toLowerCase(Locale.ROOT)
     val jniWrapper = ShuffleReaderJniWrapper.create()
     val batchSize = GlutenConfig.getConf.maxBatchSize
     val handle = jniWrapper
@@ -91,7 +94,8 @@ private class CelebornColumnarBatchSerializerInstance(
         nmm.getNativeInstanceHandle,
         compressionCodec,
         compressionCodecBackend,
-        batchSize
+        batchSize,
+        shuffleWriterType
       )
     // Close shuffle reader instance as lately as the end of task processing,
     // since the native reader could hold a reference to memory pool that
diff --git 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
index 75d5148cd..37ea11a73 100644
--- 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
@@ -69,6 +69,12 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
     }
   }
 
+  private val memoryLimit: Long = if ("sort".equals(shuffleWriterType)) {
+    Math.min(clientSortMemoryMaxSize, clientPushBufferMaxSize * numPartitions)
+  } else {
+    availableOffHeapPerTask()
+  }
+
   private def availableOffHeapPerTask(): Long = {
     val perTask =
       SparkMemoryUtil.getCurrentAvailableOffHeapMemory / 
SparkResourceUtil.getTaskSlots(conf)
@@ -97,6 +103,7 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
             bufferCompressThreshold,
             GlutenConfig.getConf.columnarShuffleCompressionMode,
             clientPushBufferMaxSize,
+            clientPushSortMemoryThreshold,
             celebornPartitionPusher,
             NativeMemoryManagers
               .create(
@@ -127,11 +134,12 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
             context.taskAttemptId(),
             GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, 
context.partitionId),
             "celeborn",
+            shuffleWriterType,
             GlutenConfig.getConf.columnarShuffleReallocThreshold
           )
         }
         val startTime = System.nanoTime()
-        jniWrapper.split(nativeShuffleWriter, cb.numRows, handle, 
availableOffHeapPerTask())
+        jniWrapper.write(nativeShuffleWriter, cb.numRows, handle, 
availableOffHeapPerTask())
         dep.metrics("splitTime").add(System.nanoTime() - startTime)
         dep.metrics("numInputRows").add(cb.numRows)
         dep.metrics("inputBatches").add(1)
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
index 411907ae3..24425ccf7 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
@@ -41,7 +41,8 @@ public class ShuffleReaderJniWrapper implements RuntimeAware {
       long memoryManagerHandle,
       String compressionType,
       String compressionCodecBackend,
-      int batchSize);
+      int batchSize,
+      String shuffleWriterType);
 
   public native long readStream(long shuffleReaderHandle, JniByteInputStream 
jniIn);
 
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
index 243c90599..ed312fa14 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
@@ -90,8 +90,10 @@ public class ShuffleWriterJniWrapper implements RuntimeAware 
{
         taskAttemptId,
         startPartitionId,
         0,
+        0,
         null,
-        "local");
+        "local",
+        "hash");
   }
 
   /**
@@ -110,12 +112,14 @@ public class ShuffleWriterJniWrapper implements 
RuntimeAware {
       int bufferCompressThreshold,
       String compressionMode,
       int pushBufferMaxSize,
+      long sortBufferMaxSize,
       Object pusher,
       long memoryManagerHandle,
       long handle,
       long taskAttemptId,
       int startPartitionId,
       String partitionWriterType,
+      String shuffleWriterType,
       double reallocThreshold) {
     return nativeMake(
         part.getShortName(),
@@ -137,8 +141,10 @@ public class ShuffleWriterJniWrapper implements 
RuntimeAware {
         taskAttemptId,
         startPartitionId,
         pushBufferMaxSize,
+        sortBufferMaxSize,
         pusher,
-        partitionWriterType);
+        partitionWriterType,
+        shuffleWriterType);
   }
 
   public native long nativeMake(
@@ -161,8 +167,10 @@ public class ShuffleWriterJniWrapper implements 
RuntimeAware {
       long taskAttemptId,
       int startPartitionId,
       int pushBufferMaxSize,
+      long sortBufferMaxSize,
       Object pusher,
-      String partitionWriterType);
+      String partitionWriterType,
+      String shuffleWriterType);
 
   /**
    * Evict partition data.
@@ -187,7 +195,7 @@ public class ShuffleWriterJniWrapper implements 
RuntimeAware {
    *     allocator instead
    * @return batch bytes.
    */
-  public native long split(long shuffleWriterHandle, int numRows, long 
handler, long memLimit);
+  public native long write(long shuffleWriterHandle, int numRows, long 
handler, long memLimit);
 
   /**
    * Write the data remained in the buffers hold by native shuffle writer to 
each partition's
diff --git 
a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
 
b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index e632700e3..69e9aa9c9 100644
--- 
a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++ 
b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -102,7 +102,9 @@ private class ColumnarBatchSerializerInstance(
       nmm.getNativeInstanceHandle,
       compressionCodec,
       compressionCodecBackend,
-      batchSize)
+      batchSize,
+      "hash"
+    )
     // Close shuffle reader instance as lately as the end of task processing,
     // since the native reader could hold a reference to memory pool that
     // was used to create all buffers read from shuffle reader. The pool
diff --git 
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
 
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index fb933866c..c797257f1 100644
--- 
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++ 
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -181,7 +181,7 @@ class ColumnarShuffleWriter[K, V](
           )
         }
         val startTime = System.nanoTime()
-        jniWrapper.split(nativeShuffleWriter, rows, handle, 
availableOffHeapPerTask())
+        jniWrapper.write(nativeShuffleWriter, rows, handle, 
availableOffHeapPerTask())
         dep.metrics("splitTime").add(System.nanoTime() - startTime)
         dep.metrics("numInputRows").add(rows)
         dep.metrics("inputBatches").add(1)
diff --git 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index 17cfce1c0..c0063c6f4 100644
--- 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++ 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -145,6 +145,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
                   compressThreshold,
                   GlutenConfig.getConf().columnarShuffleCompressionMode(),
                   bufferSize,
+                  bufferSize,
                   partitionPusher,
                   NativeMemoryManagers.create(
                           "UniffleShuffleWriter",
@@ -180,12 +181,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
                   GlutenShuffleUtils.getStartPartitionId(
                       columnarDep.nativePartitioning(), partitionId),
                   "uniffle",
+                  "hash",
                   reallocThreshold);
         }
         long startTime = System.nanoTime();
         long bytes =
-            jniWrapper.split(nativeShuffleWriter, cb.numRows(), handle, 
availableOffHeapPerTask());
-        LOG.debug("jniWrapper.split rows {}, split bytes {}", cb.numRows(), 
bytes);
+            jniWrapper.write(nativeShuffleWriter, cb.numRows(), handle, 
availableOffHeapPerTask());
+        LOG.debug("jniWrapper.write rows {}, split bytes {}", cb.numRows(), 
bytes);
         columnarDep.metrics().get("dataSize").get().add(bytes);
         // this metric replace part of uniffle shuffle write time
         columnarDep.metrics().get("splitTime").get().add(System.nanoTime() - 
startTime);
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index ca8a9dce1..02c6bf7fe 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -134,6 +134,11 @@ class GlutenConfig(conf: SQLConf) extends Logging {
       .getConfString("spark.shuffle.manager", "sort")
       .contains("UniffleShuffleManager")
 
+  def isSortBasedCelebornShuffle: Boolean =
+    conf
+      .getConfString("spark.celeborn.client.spark.shuffle.writer", "hash")
+      .equals("sort")
+
   def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED)
 
   def enablePreferColumnar: Boolean = conf.getConf(COLUMNAR_PREFER_ENABLED)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to