This is an automated email from the ASF dual-hosted git repository. marong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new dfac04f6e [VL] Support celeborn sort based shuffle (#5675) dfac04f6e is described below commit dfac04f6e9f471a67248a2b9a2e582c9d6f22597 Author: Kerwin Zhang <xiyu...@alibaba-inc.com> AuthorDate: Fri May 17 13:44:54 2024 +0800 [VL] Support celeborn sort based shuffle (#5675) --- cpp/core/jni/JniCommon.h | 18 +- cpp/core/jni/JniWrapper.cc | 30 +- cpp/core/shuffle/FallbackRangePartitioner.cc | 18 + cpp/core/shuffle/FallbackRangePartitioner.h | 6 + cpp/core/shuffle/HashPartitioner.cc | 49 ++- cpp/core/shuffle/HashPartitioner.h | 6 + cpp/core/shuffle/LocalPartitionWriter.cc | 4 + cpp/core/shuffle/LocalPartitionWriter.h | 2 + cpp/core/shuffle/Options.h | 16 +- cpp/core/shuffle/PartitionWriter.h | 6 + cpp/core/shuffle/Partitioner.h | 9 + cpp/core/shuffle/RoundRobinPartitioner.cc | 16 + cpp/core/shuffle/RoundRobinPartitioner.h | 6 + cpp/core/shuffle/ShuffleReader.cc | 4 + cpp/core/shuffle/ShuffleReader.h | 6 +- cpp/core/shuffle/ShuffleWriter.h | 12 +- cpp/core/shuffle/SinglePartitioner.cc | 9 + cpp/core/shuffle/SinglePartitioner.h | 6 + cpp/core/shuffle/rss/RssClient.h | 2 +- cpp/core/shuffle/rss/RssPartitionWriter.cc | 7 + cpp/core/shuffle/rss/RssPartitionWriter.h | 2 + cpp/velox/CMakeLists.txt | 3 +- cpp/velox/benchmarks/GenericBenchmark.cc | 5 +- cpp/velox/benchmarks/ShuffleSplitBenchmark.cc | 9 +- cpp/velox/compute/VeloxRuntime.cc | 33 +- ...fleWriter.cc => VeloxHashBasedShuffleWriter.cc} | 161 +++----- ...uffleWriter.h => VeloxHashBasedShuffleWriter.h} | 92 +---- cpp/velox/shuffle/VeloxShuffleReader.cc | 115 +++++- cpp/velox/shuffle/VeloxShuffleReader.h | 51 ++- cpp/velox/shuffle/VeloxShuffleWriter.h | 405 ++++----------------- cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc | 317 ++++++++++++++++ cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h | 117 ++++++ cpp/velox/tests/VeloxShuffleWriterTest.cc | 24 +- cpp/velox/utils/tests/LocalRssClient.h | 2 +- cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 102 ++++-- .../gluten/celeborn/CelebornShuffleManager.java | 5 - .../CelebornHashBasedColumnarShuffleWriter.scala | 8 + .../VeloxCelebornColumnarBatchSerializer.scala | 6 +- ...loxCelebornHashBasedColumnarShuffleWriter.scala | 10 +- .../gluten/vectorized/ShuffleReaderJniWrapper.java | 3 +- .../gluten/vectorized/ShuffleWriterJniWrapper.java | 16 +- .../vectorized/ColumnarBatchSerializer.scala | 4 +- .../spark/shuffle/ColumnarShuffleWriter.scala | 2 +- .../writer/VeloxUniffleColumnarShuffleWriter.java | 6 +- .../scala/org/apache/gluten/GlutenConfig.scala | 5 + 45 files changed, 1104 insertions(+), 631 deletions(-) diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index 29c38689c..aa3b2b884 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -280,6 +280,20 @@ static inline arrow::Compression::type getCompressionType(JNIEnv* env, jstring c return compressionType; } +static inline const std::string getCompressionTypeStr(JNIEnv* env, jstring codecJstr) { + if (codecJstr == NULL) { + return "none"; + } + auto codec = env->GetStringUTFChars(codecJstr, JNI_FALSE); + + // Convert codec string into lowercase. + std::string codecLower; + std::transform(codec, codec + std::strlen(codec), std::back_inserter(codecLower), ::tolower); + + env->ReleaseStringUTFChars(codecJstr, codec); + return codecLower; +} + static inline gluten::CodecBackend getCodecBackend(JNIEnv* env, jstring codecJstr) { if (codecJstr == nullptr) { return gluten::CodecBackend::NONE; @@ -444,7 +458,7 @@ class JavaRssClient : public RssClient { env->DeleteGlobalRef(array_); } - int32_t pushPartitionData(int32_t partitionId, char* bytes, int64_t size) override { + int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t size) override { JNIEnv* env; if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) { throw gluten::GlutenException("JNIEnv was not attached to current thread"); @@ -457,7 +471,7 @@ class JavaRssClient : public RssClient { array_ = env->NewByteArray(size); array_ = static_cast<jbyteArray>(env->NewGlobalRef(array_)); } - env->SetByteArrayRegion(array_, 0, size, reinterpret_cast<jbyte*>(bytes)); + env->SetByteArrayRegion(array_, 0, size, (jbyte*)bytes); jint javaBytesSize = env->CallIntMethod(javaRssShuffleWriter_, javaPushPartitionData_, partitionId, array_, size); checkException(env); return static_cast<int32_t>(javaBytesSize); diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 7363a9da0..e70a017e0 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -831,8 +831,10 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jlong taskAttemptId, jint startPartitionId, jint pushBufferMaxSize, + jlong sortBufferMaxSize, jobject partitionPusher, - jstring partitionWriterTypeJstr) { + jstring partitionWriterTypeJstr, + jstring shuffleWriterTypeJstr) { JNI_METHOD_START auto ctx = gluten::getRuntime(env, wrapper); auto memoryManager = jniCastOrThrow<MemoryManager>(memoryManagerHandle); @@ -866,10 +868,12 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe .mergeThreshold = mergeThreshold, .compressionThreshold = compressionThreshold, .compressionType = getCompressionType(env, codecJstr), + .compressionTypeStr = getCompressionTypeStr(env, codecJstr), .compressionLevel = compressionLevel, .bufferedWrite = true, .numSubDirs = numSubDirs, - .pushBufferMaxSize = pushBufferMaxSize > 0 ? pushBufferMaxSize : kDefaultShuffleWriterBufferSize}; + .pushBufferMaxSize = pushBufferMaxSize > 0 ? pushBufferMaxSize : kDefaultPushMemoryThreshold, + .sortBufferMaxSize = sortBufferMaxSize > 0 ? sortBufferMaxSize : kDefaultSortBufferThreshold}; if (codecJstr != NULL) { partitionWriterOptions.codecBackend = getCodecBackend(env, codecBackendJstr); partitionWriterOptions.compressionMode = getCompressionMode(env, compressionModeJstr); @@ -879,6 +883,15 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe auto partitionWriterTypeC = env->GetStringUTFChars(partitionWriterTypeJstr, JNI_FALSE); auto partitionWriterType = std::string(partitionWriterTypeC); env->ReleaseStringUTFChars(partitionWriterTypeJstr, partitionWriterTypeC); + + auto shuffleWriterTypeC = env->GetStringUTFChars(shuffleWriterTypeJstr, JNI_FALSE); + auto shuffleWriterType = std::string(shuffleWriterTypeC); + env->ReleaseStringUTFChars(shuffleWriterTypeJstr, shuffleWriterTypeC); + + if (shuffleWriterType == "sort") { + shuffleWriterOptions.shuffleWriterType = kSortShuffle; + } + if (partitionWriterType == "local") { if (dataFileJstr == NULL) { throw gluten::GlutenException(std::string("Shuffle DataFile can't be null")); @@ -962,7 +975,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe JNI_METHOD_END(kInvalidResourceHandle) } -JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_split( // NOLINT +JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_write( // NOLINT JNIEnv* env, jobject wrapper, jlong shuffleWriterHandle, @@ -981,7 +994,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe // The column batch maybe VeloxColumnBatch or ArrowCStructColumnarBatch(FallbackRangeShuffleWriter) auto batch = ctx->objectStore()->retrieve<ColumnarBatch>(batchHandle); auto numBytes = batch->numBytes(); - gluten::arrowAssertOkOrThrow(shuffleWriter->split(batch, memLimit), "Native split: shuffle writer split failed"); + gluten::arrowAssertOkOrThrow(shuffleWriter->write(batch, memLimit), "Native write: shuffle writer failed"); return numBytes; JNI_METHOD_END(kInvalidResourceHandle) } @@ -1058,7 +1071,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe jlong memoryManagerHandle, jstring compressionType, jstring compressionBackend, - jint batchSize) { + jint batchSize, + jstring shuffleWriterType) { JNI_METHOD_START auto ctx = gluten::getRuntime(env, wrapper); auto memoryManager = jniCastOrThrow<MemoryManager>(memoryManagerHandle); @@ -1066,11 +1080,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe auto pool = memoryManager->getArrowMemoryPool(); ShuffleReaderOptions options = ShuffleReaderOptions{}; options.compressionType = getCompressionType(env, compressionType); + options.compressionTypeStr = getCompressionTypeStr(env, compressionType); if (compressionType != nullptr) { options.codecBackend = getCodecBackend(env, compressionBackend); } options.batchSize = batchSize; // TODO: Add coalesce option and maximum coalesced size. + + if (jStringToCString(env, shuffleWriterType) == "sort") { + options.shuffleWriterType = kSortShuffle; + } std::shared_ptr<arrow::Schema> schema = gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct ArrowSchema*>(cSchema))); @@ -1085,7 +1104,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe jobject jniIn) { JNI_METHOD_START auto ctx = gluten::getRuntime(env, wrapper); - auto reader = ctx->objectStore()->retrieve<ShuffleReader>(shuffleReaderHandle); std::shared_ptr<arrow::io::InputStream> in = std::make_shared<JavaInputStreamAdaptor>(env, reader->getPool(), jniIn); auto outItr = reader->readStream(in); diff --git a/cpp/core/shuffle/FallbackRangePartitioner.cc b/cpp/core/shuffle/FallbackRangePartitioner.cc index 4bad50b51..677fcd114 100644 --- a/cpp/core/shuffle/FallbackRangePartitioner.cc +++ b/cpp/core/shuffle/FallbackRangePartitioner.cc @@ -39,4 +39,22 @@ arrow::Status gluten::FallbackRangePartitioner::compute( return arrow::Status::OK(); } +arrow::Status gluten::FallbackRangePartitioner::compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) { + auto index = static_cast<int64_t>(vectorIndex) << 32; + for (auto i = 0; i < numRows; ++i) { + auto pid = pidArr[i]; + int64_t combined = index | (i & 0xFFFFFFFFLL); + auto& vec = rowVectorIndexMap[pid]; + vec.push_back(combined); + if (pid >= numPartitions_) { + return arrow::Status::Invalid( + "Partition id ", std::to_string(pid), " is equal or greater than ", std::to_string(numPartitions_)); + } + } + return arrow::Status::OK(); +} } // namespace gluten diff --git a/cpp/core/shuffle/FallbackRangePartitioner.h b/cpp/core/shuffle/FallbackRangePartitioner.h index f54dd1abc..b06ce7e17 100644 --- a/cpp/core/shuffle/FallbackRangePartitioner.h +++ b/cpp/core/shuffle/FallbackRangePartitioner.h @@ -30,6 +30,12 @@ class FallbackRangePartitioner final : public Partitioner { const int64_t numRows, std::vector<uint32_t>& row2partition, std::vector<uint32_t>& partition2RowCount) override; + + arrow::Status compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) override; }; } // namespace gluten diff --git a/cpp/core/shuffle/HashPartitioner.cc b/cpp/core/shuffle/HashPartitioner.cc index c62e3185f..4a26dc67b 100644 --- a/cpp/core/shuffle/HashPartitioner.cc +++ b/cpp/core/shuffle/HashPartitioner.cc @@ -19,6 +19,24 @@ namespace gluten { +int32_t computePid(const int32_t* pidArr, int64_t i, int32_t numPartitions) { + auto pid = pidArr[i] % numPartitions; +#if defined(__x86_64__) + // force to generate ASM + __asm__( + "lea (%[num_partitions],%[pid],1),%[tmp]\n" + "test %[pid],%[pid]\n" + "cmovs %[tmp],%[pid]\n" + : [pid] "+r"(pid) + : [num_partitions] "r"(numPartitions), [tmp] "r"(0)); +#else + if (pid < 0) { + pid += numPartitions_; + } +#endif + return pid; +} + arrow::Status gluten::HashPartitioner::compute( const int32_t* pidArr, const int64_t numRows, @@ -28,20 +46,7 @@ arrow::Status gluten::HashPartitioner::compute( std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0); for (auto i = 0; i < numRows; ++i) { - auto pid = pidArr[i] % numPartitions_; -#if defined(__x86_64__) - // force to generate ASM - __asm__( - "lea (%[num_partitions],%[pid],1),%[tmp]\n" - "test %[pid],%[pid]\n" - "cmovs %[tmp],%[pid]\n" - : [pid] "+r"(pid) - : [num_partitions] "r"(numPartitions_), [tmp] "r"(0)); -#else - if (pid < 0) { - pid += numPartitions_; - } -#endif + auto pid = computePid(pidArr, i, numPartitions_); row2partition[i] = pid; } @@ -52,4 +57,20 @@ arrow::Status gluten::HashPartitioner::compute( return arrow::Status::OK(); } +arrow::Status gluten::HashPartitioner::compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) { + auto index = static_cast<int64_t>(vectorIndex) << 32; + for (auto i = 0; i < numRows; ++i) { + auto pid = computePid(pidArr, i, numPartitions_); + int64_t combined = index | (i & 0xFFFFFFFFLL); + auto& vec = rowVectorIndexMap[pid]; + vec.push_back(combined); + } + + return arrow::Status::OK(); +} + } // namespace gluten diff --git a/cpp/core/shuffle/HashPartitioner.h b/cpp/core/shuffle/HashPartitioner.h index fff01f939..6cd664634 100644 --- a/cpp/core/shuffle/HashPartitioner.h +++ b/cpp/core/shuffle/HashPartitioner.h @@ -30,6 +30,12 @@ class HashPartitioner final : public Partitioner { const int64_t numRows, std::vector<uint32_t>& row2partition, std::vector<uint32_t>& partition2RowCount) override; + + arrow::Status compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) override; }; } // namespace gluten diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 0582ce0e5..2fa0b954f 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -541,6 +541,10 @@ arrow::Status LocalPartitionWriter::evict( return arrow::Status::OK(); } +arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, int64_t rawSize, const char* data, int64_t length) { + return arrow::Status::NotImplemented("Invalid code path for local shuffle writer: sort based is not supported."); +} + arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) { // Finish last spiller. RETURN_NOT_OK(finishSpill()); diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h index 2cf4f2fd9..c2bfacd4b 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.h +++ b/cpp/core/shuffle/LocalPartitionWriter.h @@ -42,6 +42,8 @@ class LocalPartitionWriter : public PartitionWriter { bool reuseBuffers, bool hasComplexType) override; + arrow::Status evict(uint32_t partitionId, int64_t rawSize, const char* data, int64_t length) override; + /// The stop function performs several tasks: /// 1. Opens the final data file. /// 2. Iterates over each partition ID (pid) to: diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index d8fe1c802..4317ed631 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -25,18 +25,24 @@ namespace gluten { static constexpr int16_t kDefaultBatchSize = 4096; -static constexpr int16_t kDefaultShuffleWriterBufferSize = 4096; +static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096; +static constexpr int64_t kDefaultSortBufferThreshold = 64000000000; +static constexpr int64_t kDefaultPushMemoryThreshold = 4096; static constexpr int32_t kDefaultNumSubDirs = 64; static constexpr int32_t kDefaultCompressionThreshold = 100; +static const std::string kDefaultCompressionTypeStr = "lz4"; static constexpr int32_t kDefaultBufferAlignment = 64; static constexpr double kDefaultBufferReallocThreshold = 0.25; static constexpr double kDefaultMergeBufferThreshold = 0.25; static constexpr bool kEnableBufferedWrite = true; +enum ShuffleWriterType { kHashShuffle, kSortShuffle }; enum PartitionWriterType { kLocal, kRss }; struct ShuffleReaderOptions { arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME; + std::string compressionTypeStr = "lz4"; + ShuffleWriterType shuffleWriterType = kHashShuffle; CodecBackend codecBackend = CodecBackend::NONE; int32_t batchSize = kDefaultBatchSize; }; @@ -44,18 +50,20 @@ struct ShuffleReaderOptions { struct ShuffleWriterOptions { int32_t bufferSize = kDefaultShuffleWriterBufferSize; double bufferReallocThreshold = kDefaultBufferReallocThreshold; + int64_t pushMemoryThreshold = kDefaultPushMemoryThreshold; Partitioning partitioning = Partitioning::kRoundRobin; int64_t taskAttemptId = -1; int32_t startPartitionId = 0; int64_t threadId = -1; + ShuffleWriterType shuffleWriterType = kHashShuffle; }; struct PartitionWriterOptions { int32_t mergeBufferSize = kDefaultShuffleWriterBufferSize; double mergeThreshold = kDefaultMergeBufferThreshold; - int32_t compressionThreshold = kDefaultCompressionThreshold; arrow::Compression::type compressionType = arrow::Compression::LZ4_FRAME; + std::string compressionTypeStr = kDefaultCompressionTypeStr; CodecBackend codecBackend = CodecBackend::NONE; int32_t compressionLevel = arrow::util::kUseDefaultCompressionLevel; CompressionMode compressionMode = CompressionMode::BUFFER; @@ -64,7 +72,9 @@ struct PartitionWriterOptions { int32_t numSubDirs = kDefaultNumSubDirs; - int32_t pushBufferMaxSize = kDefaultShuffleWriterBufferSize; + int64_t pushBufferMaxSize = kDefaultPushMemoryThreshold; + + int64_t sortBufferMaxSize = kDefaultSortBufferThreshold; }; struct ShuffleWriterMetrics { diff --git a/cpp/core/shuffle/PartitionWriter.h b/cpp/core/shuffle/PartitionWriter.h index 42e97cf06..93a6d04fe 100644 --- a/cpp/core/shuffle/PartitionWriter.h +++ b/cpp/core/shuffle/PartitionWriter.h @@ -49,10 +49,16 @@ class PartitionWriter : public Reclaimable { bool reuseBuffers, bool hasComplexType) = 0; + virtual arrow::Status evict(uint32_t partitionId, int64_t rawSize, const char* data, int64_t length) = 0; + uint64_t cachedPayloadSize() { return payloadPool_->bytes_allocated(); } + PartitionWriterOptions& options() { + return options_; + } + protected: uint32_t numPartitions_; PartitionWriterOptions options_; diff --git a/cpp/core/shuffle/Partitioner.h b/cpp/core/shuffle/Partitioner.h index 8331b8a91..b233f5b82 100644 --- a/cpp/core/shuffle/Partitioner.h +++ b/cpp/core/shuffle/Partitioner.h @@ -18,7 +18,10 @@ #pragma once #include <arrow/result.h> +#include <folly/container/F14Map.h> + #include <memory> +#include <unordered_map> #include <vector> #include "shuffle/Partitioning.h" @@ -40,6 +43,12 @@ class Partitioner { std::vector<uint32_t>& row2partition, std::vector<uint32_t>& partition2RowCount) = 0; + virtual arrow::Status compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) = 0; + protected: Partitioner(int32_t numPartitions, bool hasPid) : numPartitions_(numPartitions), hasPid_(hasPid) {} diff --git a/cpp/core/shuffle/RoundRobinPartitioner.cc b/cpp/core/shuffle/RoundRobinPartitioner.cc index b00680a18..196f9308d 100644 --- a/cpp/core/shuffle/RoundRobinPartitioner.cc +++ b/cpp/core/shuffle/RoundRobinPartitioner.cc @@ -39,4 +39,20 @@ arrow::Status gluten::RoundRobinPartitioner::compute( return arrow::Status::OK(); } +arrow::Status gluten::RoundRobinPartitioner::compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) { + auto index = static_cast<int64_t>(vectorIndex) << 32; + for (int32_t i = 0; i < numRows; ++i) { + int64_t combined = index | (i & 0xFFFFFFFFLL); + auto& vec = rowVectorIndexMap[pidSelection_]; + vec.push_back(combined); + pidSelection_ = (pidSelection_ + 1) % numPartitions_; + } + + return arrow::Status::OK(); +} + } // namespace gluten diff --git a/cpp/core/shuffle/RoundRobinPartitioner.h b/cpp/core/shuffle/RoundRobinPartitioner.h index 5afd2832a..126a08eb9 100644 --- a/cpp/core/shuffle/RoundRobinPartitioner.h +++ b/cpp/core/shuffle/RoundRobinPartitioner.h @@ -32,6 +32,12 @@ class RoundRobinPartitioner final : public Partitioner { std::vector<uint32_t>& row2Partition, std::vector<uint32_t>& partition2RowCount) override; + arrow::Status compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) override; + private: friend class RoundRobinPartitionerTest; diff --git a/cpp/core/shuffle/ShuffleReader.cc b/cpp/core/shuffle/ShuffleReader.cc index 471409d6d..faa81b522 100644 --- a/cpp/core/shuffle/ShuffleReader.cc +++ b/cpp/core/shuffle/ShuffleReader.cc @@ -48,6 +48,10 @@ int64_t ShuffleReader::getIpcTime() const { return ipcTime_; } +ShuffleWriterType ShuffleReader::getShuffleWriterType() const { + return factory_->getShuffleWriterType(); +} + int64_t ShuffleReader::getDeserializeTime() const { return factory_->getDeserializeTime(); } diff --git a/cpp/core/shuffle/ShuffleReader.h b/cpp/core/shuffle/ShuffleReader.h index 4ba105712..5cef14768 100644 --- a/cpp/core/shuffle/ShuffleReader.h +++ b/cpp/core/shuffle/ShuffleReader.h @@ -39,6 +39,8 @@ class DeserializerFactory { virtual int64_t getDecompressTime() = 0; virtual int64_t getDeserializeTime() = 0; + + virtual ShuffleWriterType getShuffleWriterType() = 0; }; class ShuffleReader { @@ -60,13 +62,15 @@ class ShuffleReader { arrow::MemoryPool* getPool() const; + ShuffleWriterType getShuffleWriterType() const; + protected: arrow::MemoryPool* pool_; int64_t decompressTime_ = 0; int64_t ipcTime_ = 0; int64_t deserializeTime_ = 0; - ShuffleReaderOptions options_; + ShuffleWriterType shuffleWriterType_; private: std::shared_ptr<arrow::Schema> schema_; diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h index bcf0c2c3b..a7987ce3e 100644 --- a/cpp/core/shuffle/ShuffleWriter.h +++ b/cpp/core/shuffle/ShuffleWriter.h @@ -37,7 +37,7 @@ class ShuffleWriter : public Reclaimable { public: static constexpr int64_t kMinMemLimit = 128LL * 1024 * 1024; - virtual arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) = 0; + virtual arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) = 0; virtual arrow::Status stop() = 0; @@ -45,6 +45,10 @@ class ShuffleWriter : public Reclaimable { return numPartitions_; } + ShuffleWriterOptions& options() { + return options_; + } + int64_t partitionBufferSize() const { return partitionBufferPool_->bytes_allocated(); } @@ -81,7 +85,9 @@ class ShuffleWriter : public Reclaimable { return metrics_.rawPartitionLengths; } - virtual const uint64_t cachedPayloadSize() const = 0; + const int64_t rawPartitionBytes() { + return std::accumulate(metrics_.rawPartitionLengths.begin(), metrics_.rawPartitionLengths.end(), 0LL); + } protected: ShuffleWriter( @@ -108,6 +114,8 @@ class ShuffleWriter : public Reclaimable { std::unique_ptr<PartitionWriter> partitionWriter_; + std::vector<int64_t> rowVectorLengths_; + std::shared_ptr<arrow::Schema> schema_; // Column index, partition id, buffers. diff --git a/cpp/core/shuffle/SinglePartitioner.cc b/cpp/core/shuffle/SinglePartitioner.cc index c4f80ce79..981a5b8e4 100644 --- a/cpp/core/shuffle/SinglePartitioner.cc +++ b/cpp/core/shuffle/SinglePartitioner.cc @@ -28,4 +28,13 @@ arrow::Status gluten::SinglePartitioner::compute( return arrow::Status::OK(); } +arrow::Status gluten::SinglePartitioner::compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) { + // nothing is need do here + return arrow::Status::OK(); +} + } // namespace gluten diff --git a/cpp/core/shuffle/SinglePartitioner.h b/cpp/core/shuffle/SinglePartitioner.h index d3d2c29f7..e5d7a920f 100644 --- a/cpp/core/shuffle/SinglePartitioner.h +++ b/cpp/core/shuffle/SinglePartitioner.h @@ -29,5 +29,11 @@ class SinglePartitioner final : public Partitioner { const int64_t numRows, std::vector<uint32_t>& row2partition, std::vector<uint32_t>& partition2RowCount) override; + + arrow::Status compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) override; }; } // namespace gluten diff --git a/cpp/core/shuffle/rss/RssClient.h b/cpp/core/shuffle/rss/RssClient.h index 9209430b0..dddccfa1a 100644 --- a/cpp/core/shuffle/rss/RssClient.h +++ b/cpp/core/shuffle/rss/RssClient.h @@ -21,7 +21,7 @@ class RssClient { public: virtual ~RssClient() = default; - virtual int32_t pushPartitionData(int32_t partitionId, char* bytes, int64_t size) = 0; + virtual int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t size) = 0; virtual void stop() = 0; }; diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc b/cpp/core/shuffle/rss/RssPartitionWriter.cc index 15981bf8d..015129e26 100644 --- a/cpp/core/shuffle/rss/RssPartitionWriter.cc +++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc @@ -73,4 +73,11 @@ arrow::Status RssPartitionWriter::evict( partitionId, reinterpret_cast<char*>(const_cast<uint8_t*>(buffer->data())), buffer->size()); return arrow::Status::OK(); } + +arrow::Status RssPartitionWriter::evict(uint32_t partitionId, int64_t rawSize, const char* data, int64_t length) { + rawPartitionLengths_[partitionId] += rawSize; + ScopedTimer timer(&spillTime_); + bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, data, length); + return arrow::Status::OK(); +} } // namespace gluten diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h b/cpp/core/shuffle/rss/RssPartitionWriter.h index ef43017fc..b8cc1551c 100644 --- a/cpp/core/shuffle/rss/RssPartitionWriter.h +++ b/cpp/core/shuffle/rss/RssPartitionWriter.h @@ -44,6 +44,8 @@ class RssPartitionWriter final : public RemotePartitionWriter { bool reuseBuffers, bool hasComplexType) override; + arrow::Status evict(uint32_t partitionId, int64_t rawSize, const char* data, int64_t length) override; + arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override; arrow::Status stop(ShuffleWriterMetrics* metrics) override; diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 9e5f08b1c..c058883b6 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -310,7 +310,8 @@ set(VELOX_SRCS operators/serializer/VeloxRowToColumnarConverter.cc operators/writer/VeloxParquetDatasource.cc shuffle/VeloxShuffleReader.cc - shuffle/VeloxShuffleWriter.cc + shuffle/VeloxHashBasedShuffleWriter.cc + shuffle/VeloxSortBasedShuffleWriter.cc substrait/SubstraitParser.cc substrait/SubstraitToVeloxExpr.cc substrait/SubstraitToVeloxPlan.cc diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 14593c8df..71d3d96b5 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -31,6 +31,7 @@ #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" #include "shuffle/LocalPartitionWriter.h" +#include "shuffle/VeloxHashBasedShuffleWriter.h" #include "shuffle/VeloxShuffleWriter.h" #include "shuffle/rss/RssPartitionWriter.h" #include "utils/StringUtil.h" @@ -111,7 +112,7 @@ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter( options.partitioning = gluten::toPartitioning(FLAGS_partitioning); GLUTEN_ASSIGN_OR_THROW( auto shuffleWriter, - VeloxShuffleWriter::create( + VeloxHashBasedShuffleWriter::create( FLAGS_shuffle_partitions, std::move(partitionWriter), std::move(options), @@ -191,7 +192,7 @@ auto BM_Generic = [](::benchmark::State& state, GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, isFromEnv)); const auto& shuffleWriter = createShuffleWriter(memoryManager.get(), dataFile, localDirs); while (resultIter->hasNext()) { - GLUTEN_THROW_NOT_OK(shuffleWriter->split(resultIter->next(), ShuffleWriter::kMinMemLimit)); + GLUTEN_THROW_NOT_OK(shuffleWriter->write(resultIter->next(), ShuffleWriter::kMinMemLimit)); } GLUTEN_THROW_NOT_OK(shuffleWriter->stop()); TIME_NANO_END(shuffleWriteTime); diff --git a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc index 0109de603..4a4bb69b8 100644 --- a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc +++ b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc @@ -31,6 +31,7 @@ #include "benchmarks/common/BenchmarkUtils.h" #include "memory/ColumnarBatch.h" #include "shuffle/LocalPartitionWriter.h" +#include "shuffle/VeloxHashBasedShuffleWriter.h" #include "shuffle/VeloxShuffleWriter.h" #include "utils/TestUtils.h" #include "utils/VeloxArrowUtils.h" @@ -259,7 +260,7 @@ class BenchmarkShuffleSplitCacheScanBenchmark : public BenchmarkShuffleSplit { numPartitions, PartitionWriterOptions{}, defaultArrowMemoryPool().get(), dataFile, localDirs); GLUTEN_ASSIGN_OR_THROW( shuffleWriter, - VeloxShuffleWriter::create( + VeloxHashBasedShuffleWriter::create( numPartitions, std::move(partitionWriter), std::move(options), @@ -294,7 +295,7 @@ class BenchmarkShuffleSplitCacheScanBenchmark : public BenchmarkShuffleSplit { [&shuffleWriter, &splitTime](const std::shared_ptr<arrow::RecordBatch>& recordBatch) { std::shared_ptr<ColumnarBatch> cb; ARROW_ASSIGN_OR_THROW(cb, recordBatch2VeloxColumnarBatch(*recordBatch)); - TIME_NANO_OR_THROW(splitTime, shuffleWriter->split(cb, ShuffleWriter::kMinMemLimit)); + TIME_NANO_OR_THROW(splitTime, shuffleWriter->write(cb, ShuffleWriter::kMinMemLimit)); }); // LOG(INFO) << " split done memory allocated = " << // options.memoryPool->bytes_allocated(); @@ -327,7 +328,7 @@ class BenchmarkShuffleSplitIterateScanBenchmark : public BenchmarkShuffleSplit { numPartitions, PartitionWriterOptions{}, defaultArrowMemoryPool().get(), dataFile, localDirs); GLUTEN_ASSIGN_OR_THROW( shuffleWriter, - VeloxShuffleWriter::create( + VeloxHashBasedShuffleWriter::create( numPartitions, std::move(partitionWriter), std::move(options), @@ -350,7 +351,7 @@ class BenchmarkShuffleSplitIterateScanBenchmark : public BenchmarkShuffleSplit { numRows += recordBatch->num_rows(); std::shared_ptr<ColumnarBatch> cb; ARROW_ASSIGN_OR_THROW(cb, recordBatch2VeloxColumnarBatch(*recordBatch)); - TIME_NANO_OR_THROW(splitTime, shuffleWriter->split(cb, ShuffleWriter::kMinMemLimit)); + TIME_NANO_OR_THROW(splitTime, shuffleWriter->write(cb, ShuffleWriter::kMinMemLimit)); TIME_NANO_OR_THROW(elapseRead, recordBatchReader->ReadNext(&recordBatch)); } } diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index a3e8c159c..15c84b41c 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -28,8 +28,9 @@ #include "compute/VeloxPlanConverter.h" #include "config/VeloxConfig.h" #include "operators/serializer/VeloxRowToColumnarConverter.h" +#include "shuffle/VeloxHashBasedShuffleWriter.h" #include "shuffle/VeloxShuffleReader.h" -#include "shuffle/VeloxShuffleWriter.h" +#include "shuffle/VeloxSortBasedShuffleWriter.h" #include "utils/ConfigExtractor.h" #include "utils/VeloxArrowUtils.h" @@ -187,10 +188,19 @@ std::shared_ptr<ShuffleWriter> VeloxRuntime::createShuffleWriter( MemoryManager* memoryManager) { auto ctxPool = getLeafVeloxPool(memoryManager); auto arrowPool = memoryManager->getArrowMemoryPool(); - GLUTEN_ASSIGN_OR_THROW( - auto shuffle_writer, - VeloxShuffleWriter::create(numPartitions, std::move(partitionWriter), std::move(options), ctxPool, arrowPool)); - return shuffle_writer; + std::shared_ptr<ShuffleWriter> shuffleWriter; + if (options.shuffleWriterType == kHashShuffle) { + GLUTEN_ASSIGN_OR_THROW( + shuffleWriter, + VeloxHashBasedShuffleWriter::create( + numPartitions, std::move(partitionWriter), std::move(options), ctxPool, arrowPool)); + } else if (options.shuffleWriterType == kSortShuffle) { + GLUTEN_ASSIGN_OR_THROW( + shuffleWriter, + VeloxSortBasedShuffleWriter::create( + numPartitions, std::move(partitionWriter), std::move(options), ctxPool, arrowPool)); + } + return shuffleWriter; } std::shared_ptr<Datasource> VeloxRuntime::createDatasource( @@ -242,9 +252,18 @@ std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader( auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema)); auto codec = gluten::createArrowIpcCodec(options.compressionType, options.codecBackend); auto ctxVeloxPool = getLeafVeloxPool(memoryManager); + auto veloxCompressionType = facebook::velox::common::stringToCompressionKind(options.compressionTypeStr); auto deserializerFactory = std::make_unique<gluten::VeloxColumnarBatchDeserializerFactory>( - schema, std::move(codec), rowType, options.batchSize, pool, ctxVeloxPool); - return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory)); + schema, + std::move(codec), + veloxCompressionType, + rowType, + options.batchSize, + pool, + ctxVeloxPool, + options.shuffleWriterType); + auto reader = std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory)); + return reader; } std::unique_ptr<ColumnarBatchSerializer> VeloxRuntime::createColumnarBatchSerializer( diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc similarity index 90% rename from cpp/velox/shuffle/VeloxShuffleWriter.cc rename to cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc index b304565e5..daff13703 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "VeloxShuffleWriter.h" +#include "VeloxHashBasedShuffleWriter.h" #include "memory/ArrowMemory.h" #include "memory/VeloxColumnarBatch.h" #include "memory/VeloxMemoryManager.h" @@ -70,58 +70,6 @@ bool vectorHasNull(const facebook::velox::VectorPtr& vp) { return vp->countNulls(vp->nulls(), vp->size()) != 0; } -facebook::velox::RowVectorPtr getStrippedRowVector(const facebook::velox::RowVector& rv) { - // get new row type - auto rowType = rv.type()->asRow(); - auto typeChildren = rowType.children(); - typeChildren.erase(typeChildren.begin()); - auto newRowType = facebook::velox::ROW(std::move(typeChildren)); - - // get length - auto length = rv.size(); - - // get children - auto children = rv.children(); - children.erase(children.begin()); - - return std::make_shared<facebook::velox::RowVector>( - rv.pool(), newRowType, facebook::velox::BufferPtr(nullptr), length, std::move(children)); -} - -const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) { - VELOX_CHECK(rv.childrenSize() > 0, "RowVector missing partition id column."); - - auto& firstChild = rv.childAt(0); - VELOX_CHECK(firstChild->isFlatEncoding(), "Partition id (field 0) is not flat encoding."); - VELOX_CHECK( - firstChild->type()->isInteger(), - "Partition id (field 0) should be integer, but got {}", - firstChild->type()->toString()); - - // first column is partition key hash value or pid - return firstChild->asFlatVector<int32_t>()->rawValues(); -} - -class EvictGuard { - public: - explicit EvictGuard(EvictState& evictState) : evictState_(evictState) { - evictState_ = EvictState::kUnevictable; - } - - ~EvictGuard() { - evictState_ = EvictState::kEvictable; - } - - // For safety and clarity. - EvictGuard(const EvictGuard&) = delete; - EvictGuard& operator=(const EvictGuard&) = delete; - EvictGuard(EvictGuard&&) = delete; - EvictGuard& operator=(EvictGuard&&) = delete; - - private: - EvictState& evictState_; -}; - class BinaryArrayResizeGuard { public: explicit BinaryArrayResizeGuard(BinaryArrayResizeState& state) : state_(state) { @@ -199,19 +147,19 @@ arrow::Status collectFlatVectorBuffer<facebook::velox::TypeKind::VARBINARY>( } // namespace -arrow::Result<std::shared_ptr<VeloxShuffleWriter>> VeloxShuffleWriter::create( +arrow::Result<std::shared_ptr<VeloxShuffleWriter>> VeloxHashBasedShuffleWriter::create( uint32_t numPartitions, std::unique_ptr<PartitionWriter> partitionWriter, ShuffleWriterOptions options, std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, arrow::MemoryPool* arrowPool) { - std::shared_ptr<VeloxShuffleWriter> res( - new VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), veloxPool, arrowPool)); + std::shared_ptr<VeloxHashBasedShuffleWriter> res(new VeloxHashBasedShuffleWriter( + numPartitions, std::move(partitionWriter), std::move(options), veloxPool, arrowPool)); RETURN_NOT_OK(res->init()); return res; } // namespace gluten -arrow::Status VeloxShuffleWriter::init() { +arrow::Status VeloxHashBasedShuffleWriter::init() { #if defined(__x86_64__) supportAvx512_ = __builtin_cpu_supports("avx512bw"); #else @@ -235,7 +183,7 @@ arrow::Status VeloxShuffleWriter::init() { return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::initPartitions() { +arrow::Status VeloxHashBasedShuffleWriter::initPartitions() { auto simpleColumnCount = simpleColumnIndices_.size(); partitionValidityAddrs_.resize(simpleColumnCount); @@ -260,15 +208,11 @@ arrow::Status VeloxShuffleWriter::initPartitions() { return arrow::Status::OK(); } -int64_t VeloxShuffleWriter::rawPartitionBytes() const { - return std::accumulate(metrics_.rawPartitionLengths.begin(), metrics_.rawPartitionLengths.end(), 0LL); -} - -void VeloxShuffleWriter::setPartitionBufferSize(uint32_t newSize) { +void VeloxHashBasedShuffleWriter::setPartitionBufferSize(uint32_t newSize) { options_.bufferSize = newSize; } -arrow::Result<std::shared_ptr<arrow::Buffer>> VeloxShuffleWriter::generateComplexTypeBuffers( +arrow::Result<std::shared_ptr<arrow::Buffer>> VeloxHashBasedShuffleWriter::generateComplexTypeBuffers( facebook::velox::RowVectorPtr vector) { auto arena = std::make_unique<facebook::velox::StreamArena>(veloxPool_.get()); auto serializer = @@ -291,7 +235,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> VeloxShuffleWriter::generateComple return valueBuffer; } -arrow::Status VeloxShuffleWriter::split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) { +arrow::Status VeloxHashBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) { if (options_.partitioning == Partitioning::kSingle) { auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb); VELOX_CHECK_NOT_NULL(veloxColumnBatch); @@ -357,7 +301,7 @@ arrow::Status VeloxShuffleWriter::split(std::shared_ptr<ColumnarBatch> cb, int64 return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit) { +arrow::Status VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit) { if (partitioner_->hasPid()) { auto pidArr = getFirstColumn(*rv); START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); @@ -376,7 +320,7 @@ arrow::Status VeloxShuffleWriter::partitioningAndDoSplit(facebook::velox::RowVec return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::stop() { +arrow::Status VeloxHashBasedShuffleWriter::stop() { if (options_.partitioning != Partitioning::kSingle) { for (auto pid = 0; pid < numPartitions_; ++pid) { RETURN_NOT_OK(evictPartitionBuffers(pid, false)); @@ -394,7 +338,7 @@ arrow::Status VeloxShuffleWriter::stop() { return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::buildPartition2Row(uint32_t rowNum) { +arrow::Status VeloxHashBasedShuffleWriter::buildPartition2Row(uint32_t rowNum) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingBuildPartition]); // calc partition2RowOffsetBase_ @@ -427,7 +371,7 @@ arrow::Status VeloxShuffleWriter::buildPartition2Row(uint32_t rowNum) { return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::updateInputHasNull(const facebook::velox::RowVector& rv) { +arrow::Status VeloxHashBasedShuffleWriter::updateInputHasNull(const facebook::velox::RowVector& rv) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingHasNull]); for (size_t col = 0; col < simpleColumnIndices_.size(); ++col) { @@ -444,11 +388,11 @@ arrow::Status VeloxShuffleWriter::updateInputHasNull(const facebook::velox::RowV return arrow::Status::OK(); } -void VeloxShuffleWriter::setSplitState(SplitState state) { +void VeloxHashBasedShuffleWriter::setSplitState(SplitState state) { splitState_ = state; } -arrow::Status VeloxShuffleWriter::doSplit(const facebook::velox::RowVector& rv, int64_t memLimit) { +arrow::Status VeloxHashBasedShuffleWriter::doSplit(const facebook::velox::RowVector& rv, int64_t memLimit) { auto rowNum = rv.size(); RETURN_NOT_OK(buildPartition2Row(rowNum)); RETURN_NOT_OK(updateInputHasNull(rv)); @@ -472,7 +416,7 @@ arrow::Status VeloxShuffleWriter::doSplit(const facebook::velox::RowVector& rv, return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::splitRowVector(const facebook::velox::RowVector& rv) { +arrow::Status VeloxHashBasedShuffleWriter::splitRowVector(const facebook::velox::RowVector& rv) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitRV]); // now start to split the RowVector @@ -489,7 +433,7 @@ arrow::Status VeloxShuffleWriter::splitRowVector(const facebook::velox::RowVecto return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::velox::RowVector& rv) { +arrow::Status VeloxHashBasedShuffleWriter::splitFixedWidthValueBuffer(const facebook::velox::RowVector& rv) { for (auto col = 0; col < fixedWidthColumnCount_; ++col) { auto colIdx = simpleColumnIndices_[col]; auto& column = rv.childAt(colIdx); @@ -543,7 +487,9 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::splitBoolType(const uint8_t* srcAddr, const std::vector<uint8_t*>& dstAddrs) { +arrow::Status VeloxHashBasedShuffleWriter::splitBoolType( + const uint8_t* srcAddr, + const std::vector<uint8_t*>& dstAddrs) { // assume batch size = 32k; reducer# = 4K; row/reducer = 8 for (auto& pid : partitionUsed_) { // set the last byte @@ -632,7 +578,7 @@ arrow::Status VeloxShuffleWriter::splitBoolType(const uint8_t* srcAddr, const st return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::splitValidityBuffer(const facebook::velox::RowVector& rv) { +arrow::Status VeloxHashBasedShuffleWriter::splitValidityBuffer(const facebook::velox::RowVector& rv) { for (size_t col = 0; col < simpleColumnIndices_.size(); ++col) { auto colIdx = simpleColumnIndices_[col]; auto& column = rv.childAt(colIdx); @@ -660,7 +606,7 @@ arrow::Status VeloxShuffleWriter::splitValidityBuffer(const facebook::velox::Row return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::splitBinaryType( +arrow::Status VeloxHashBasedShuffleWriter::splitBinaryType( uint32_t binaryIdx, const facebook::velox::FlatVector<facebook::velox::StringView>& src, std::vector<BinaryBuf>& dst) { @@ -723,7 +669,7 @@ arrow::Status VeloxShuffleWriter::splitBinaryType( return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::splitBinaryArray(const facebook::velox::RowVector& rv) { +arrow::Status VeloxHashBasedShuffleWriter::splitBinaryArray(const facebook::velox::RowVector& rv) { for (auto col = fixedWidthColumnCount_; col < simpleColumnIndices_.size(); ++col) { auto binaryIdx = col - fixedWidthColumnCount_; auto& dstAddrs = partitionBinaryAddrs_[binaryIdx]; @@ -734,7 +680,7 @@ arrow::Status VeloxShuffleWriter::splitBinaryArray(const facebook::velox::RowVec return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::splitComplexType(const facebook::velox::RowVector& rv) { +arrow::Status VeloxHashBasedShuffleWriter::splitComplexType(const facebook::velox::RowVector& rv) { if (complexColumnIndices_.size() == 0) { return arrow::Status::OK(); } @@ -773,7 +719,7 @@ arrow::Status VeloxShuffleWriter::splitComplexType(const facebook::velox::RowVec return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::initColumnTypes(const facebook::velox::RowVector& rv) { +arrow::Status VeloxHashBasedShuffleWriter::initColumnTypes(const facebook::velox::RowVector& rv) { schema_ = toArrowSchema(rv.type(), veloxPool_.get()); for (size_t i = 0; i < rv.childrenSize(); ++i) { veloxColumnTypes_.push_back(rv.childAt(i)->type()); @@ -837,7 +783,7 @@ arrow::Status VeloxShuffleWriter::initColumnTypes(const facebook::velox::RowVect return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::initFromRowVector(const facebook::velox::RowVector& rv) { +arrow::Status VeloxHashBasedShuffleWriter::initFromRowVector(const facebook::velox::RowVector& rv) { if (veloxColumnTypes_.empty()) { RETURN_NOT_OK(initColumnTypes(rv)); RETURN_NOT_OK(initPartitions()); @@ -846,13 +792,13 @@ arrow::Status VeloxShuffleWriter::initFromRowVector(const facebook::velox::RowVe return arrow::Status::OK(); } -inline bool VeloxShuffleWriter::beyondThreshold(uint32_t partitionId, uint32_t newSize) { +inline bool VeloxHashBasedShuffleWriter::beyondThreshold(uint32_t partitionId, uint32_t newSize) { auto currentBufferSize = partitionBufferSize_[partitionId]; return newSize > (1 + options_.bufferReallocThreshold) * currentBufferSize || newSize < (1 - options_.bufferReallocThreshold) * currentBufferSize; } -void VeloxShuffleWriter::calculateSimpleColumnBytes() { +void VeloxHashBasedShuffleWriter::calculateSimpleColumnBytes() { fixedWidthBufferBytes_ = 0; for (size_t col = 0; col < fixedWidthColumnCount_; ++col) { auto colIdx = simpleColumnIndices_[col]; @@ -862,7 +808,9 @@ void VeloxShuffleWriter::calculateSimpleColumnBytes() { fixedWidthBufferBytes_ += kSizeOfBinaryArrayLengthBuffer * binaryColumnIndices_.size(); } -uint32_t VeloxShuffleWriter::calculatePartitionBufferSize(const facebook::velox::RowVector& rv, int64_t memLimit) { +uint32_t VeloxHashBasedShuffleWriter::calculatePartitionBufferSize( + const facebook::velox::RowVector& rv, + int64_t memLimit) { auto bytesPerRow = fixedWidthBufferBytes_; SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCalculateBufferSize]); @@ -915,7 +863,7 @@ uint32_t VeloxShuffleWriter::calculatePartitionBufferSize(const facebook::velox: } arrow::Result<std::shared_ptr<arrow::ResizableBuffer>> -VeloxShuffleWriter::allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint32_t newSize) { +VeloxHashBasedShuffleWriter::allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint32_t newSize) { if (inputHasNull_[col]) { ARROW_ASSIGN_OR_RAISE( auto validityBuffer, @@ -929,7 +877,7 @@ VeloxShuffleWriter::allocateValidityBuffer(uint32_t col, uint32_t partitionId, u return nullptr; } -arrow::Status VeloxShuffleWriter::updateValidityBuffers(uint32_t partitionId, uint32_t newSize) { +arrow::Status VeloxHashBasedShuffleWriter::updateValidityBuffers(uint32_t partitionId, uint32_t newSize) { for (auto i = 0; i < simpleColumnIndices_.size(); ++i) { // If the validity buffer is not yet allocated, allocate and fill 0xff based on inputHasNull_. if (partitionValidityAddrs_[i][partitionId] == nullptr) { @@ -940,7 +888,7 @@ arrow::Status VeloxShuffleWriter::updateValidityBuffers(uint32_t partitionId, ui return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize) { +arrow::Status VeloxHashBasedShuffleWriter::allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingAllocateBuffer]); for (auto i = 0; i < simpleColumnIndices_.size(); ++i) { @@ -987,7 +935,7 @@ arrow::Status VeloxShuffleWriter::allocatePartitionBuffer(uint32_t partitionId, return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::evictBuffers( +arrow::Status VeloxHashBasedShuffleWriter::evictBuffers( uint32_t partitionId, uint32_t numRows, std::vector<std::shared_ptr<arrow::Buffer>> buffers, @@ -1000,7 +948,7 @@ arrow::Status VeloxShuffleWriter::evictBuffers( return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) { +arrow::Status VeloxHashBasedShuffleWriter::evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) { auto numRows = partitionBufferBase_[partitionId]; if (numRows > 0) { ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(partitionId, reuseBuffers)); @@ -1009,7 +957,7 @@ arrow::Status VeloxShuffleWriter::evictPartitionBuffers(uint32_t partitionId, bo return arrow::Status::OK(); } -arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> VeloxShuffleWriter::assembleBuffers( +arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> VeloxHashBasedShuffleWriter::assembleBuffers( uint32_t partitionId, bool reuseBuffers) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCreateRbFromBuffer]); @@ -1150,7 +1098,7 @@ arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> VeloxShuffleWriter::a return allBuffers; } -arrow::Status VeloxShuffleWriter::reclaimFixedSize(int64_t size, int64_t* actual) { +arrow::Status VeloxHashBasedShuffleWriter::reclaimFixedSize(int64_t size, int64_t* actual) { if (evictState_ == EvictState::kUnevictable) { *actual = 0; return arrow::Status::OK(); @@ -1174,7 +1122,7 @@ arrow::Status VeloxShuffleWriter::reclaimFixedSize(int64_t size, int64_t* actual return arrow::Status::OK(); } -arrow::Result<int64_t> VeloxShuffleWriter::evictCachedPayload(int64_t size) { +arrow::Result<int64_t> VeloxHashBasedShuffleWriter::evictCachedPayload(int64_t size) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingEvictPartition]); int64_t actual; auto before = partitionBufferPool_->bytes_allocated(); @@ -1188,7 +1136,7 @@ arrow::Result<int64_t> VeloxShuffleWriter::evictCachedPayload(int64_t size) { return actual; } -arrow::Status VeloxShuffleWriter::resetValidityBuffer(uint32_t partitionId) { +arrow::Status VeloxHashBasedShuffleWriter::resetValidityBuffer(uint32_t partitionId) { std::for_each(partitionBuffers_.begin(), partitionBuffers_.end(), [partitionId](auto& bufs) { if (bufs[partitionId].size() != 0 && bufs[partitionId][kValidityBufferIndex] != nullptr) { // initialize all true once allocated @@ -1199,7 +1147,8 @@ arrow::Status VeloxShuffleWriter::resetValidityBuffer(uint32_t partitionId) { return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, uint32_t newSize, bool preserveData) { +arrow::Status +VeloxHashBasedShuffleWriter::resizePartitionBuffer(uint32_t partitionId, uint32_t newSize, bool preserveData) { for (auto i = 0; i < simpleColumnIndices_.size(); ++i) { auto columnType = schema_->field(simpleColumnIndices_[i])->type()->id(); auto& buffers = partitionBuffers_[i][partitionId]; @@ -1278,7 +1227,7 @@ arrow::Status VeloxShuffleWriter::resizePartitionBuffer(uint32_t partitionId, ui return arrow::Status::OK(); } -arrow::Status VeloxShuffleWriter::shrinkPartitionBuffer(uint32_t partitionId) { +arrow::Status VeloxHashBasedShuffleWriter::shrinkPartitionBuffer(uint32_t partitionId) { auto bufferSize = partitionBufferSize_[partitionId]; if (bufferSize == 0) { return arrow::Status::OK(); @@ -1301,11 +1250,11 @@ arrow::Status VeloxShuffleWriter::shrinkPartitionBuffer(uint32_t partitionId) { return resizePartitionBuffer(partitionId, newSize, /*preserveData=*/true); } -uint64_t VeloxShuffleWriter::valueBufferSizeForBinaryArray(uint32_t binaryIdx, uint32_t newSize) { +uint64_t VeloxHashBasedShuffleWriter::valueBufferSizeForBinaryArray(uint32_t binaryIdx, uint32_t newSize) { return (binaryArrayTotalSizeBytes_[binaryIdx] + totalInputNumRows_ - 1) / totalInputNumRows_ * newSize + 1024; } -uint64_t VeloxShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex, uint32_t newSize) { +uint64_t VeloxHashBasedShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex, uint32_t newSize) { uint64_t valueBufferSize = 0; auto columnIdx = simpleColumnIndices_[fixedWidthIndex]; if (arrowColumnTypes_[columnIdx]->id() == arrow::BooleanType::type_id) { @@ -1320,7 +1269,7 @@ uint64_t VeloxShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t fixedWid return valueBufferSize; } -void VeloxShuffleWriter::stat() const { +void VeloxHashBasedShuffleWriter::stat() const { #if VELOX_SHUFFLE_WRITER_LOG_FLAG for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) { std::ostringstream oss; @@ -1336,7 +1285,7 @@ void VeloxShuffleWriter::stat() const { #endif } -arrow::Status VeloxShuffleWriter::resetPartitionBuffer(uint32_t partitionId) { +arrow::Status VeloxHashBasedShuffleWriter::resetPartitionBuffer(uint32_t partitionId) { // Reset fixed-width partition buffers for (auto i = 0; i < fixedWidthColumnCount_; ++i) { partitionValidityAddrs_[i][partitionId] = nullptr; @@ -1356,11 +1305,11 @@ arrow::Status VeloxShuffleWriter::resetPartitionBuffer(uint32_t partitionId) { return arrow::Status::OK(); } -const uint64_t VeloxShuffleWriter::cachedPayloadSize() const { +const uint64_t VeloxHashBasedShuffleWriter::cachedPayloadSize() const { return partitionWriter_->cachedPayloadSize(); } -arrow::Result<int64_t> VeloxShuffleWriter::shrinkPartitionBuffersMinSize(int64_t size) { +arrow::Result<int64_t> VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSize(int64_t size) { // Sort partition buffers by (partitionBufferSize_ - partitionBufferBase_) std::vector<std::pair<uint32_t, uint32_t>> pidToSize; for (auto pid = 0; pid < numPartitions_; ++pid) { @@ -1388,7 +1337,7 @@ arrow::Result<int64_t> VeloxShuffleWriter::shrinkPartitionBuffersMinSize(int64_t return shrunken; } -arrow::Result<int64_t> VeloxShuffleWriter::evictPartitionBuffersMinSize(int64_t size) { +arrow::Result<int64_t> VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize(int64_t size) { // Evict partition buffers, only when splitState_ == SplitState::kInit, and space freed from // shrinking is not enough. In this case partitionBufferSize_ == partitionBufferBase_ int64_t beforeEvict = partitionBufferPool_->bytes_allocated(); @@ -1415,7 +1364,7 @@ arrow::Result<int64_t> VeloxShuffleWriter::evictPartitionBuffersMinSize(int64_t return evicted; } -bool VeloxShuffleWriter::shrinkPartitionBuffersAfterSpill() const { +bool VeloxHashBasedShuffleWriter::shrinkPartitionBuffersAfterSpill() const { // If OOM happens during SplitState::kSplit, it is triggered by binary buffers resize. // Or during SplitState::kInit, it is triggered by other operators. // The reclaim order is spill->shrink, because the partition buffers can be reused. @@ -1424,13 +1373,13 @@ bool VeloxShuffleWriter::shrinkPartitionBuffersAfterSpill() const { (splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit); } -bool VeloxShuffleWriter::evictPartitionBuffersAfterSpill() const { +bool VeloxHashBasedShuffleWriter::evictPartitionBuffersAfterSpill() const { // If OOM triggered by other operators, the splitState_ is SplitState::kInit. // The last resort is to evict the partition buffers to reclaim more space. return options_.partitioning != Partitioning::kSingle && splitState_ == SplitState::kInit; } -arrow::Result<uint32_t> VeloxShuffleWriter::partitionBufferSizeAfterShrink(uint32_t partitionId) const { +arrow::Result<uint32_t> VeloxHashBasedShuffleWriter::partitionBufferSizeAfterShrink(uint32_t partitionId) const { if (splitState_ == SplitState::kSplit) { return partitionBufferBase_[partitionId] + partition2RowCount_[partitionId]; } @@ -1440,7 +1389,7 @@ arrow::Result<uint32_t> VeloxShuffleWriter::partitionBufferSizeAfterShrink(uint3 return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(splitState_)); } -arrow::Status VeloxShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBufferSize) { +arrow::Status VeloxHashBasedShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBufferSize) { for (auto& pid : partitionUsed_) { auto newSize = std::max(preAllocBufferSize, partition2RowCount_[pid]); VLOG_IF(9, partitionBufferSize_[pid] != newSize) @@ -1494,7 +1443,7 @@ arrow::Status VeloxShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBuff return arrow::Status::OK(); } -bool VeloxShuffleWriter::isExtremelyLargeBatch(facebook::velox::RowVectorPtr& rv) const { +bool VeloxHashBasedShuffleWriter::isExtremelyLargeBatch(facebook::velox::RowVectorPtr& rv) const { return (rv->size() > maxBatchSize_ && maxBatchSize_ > 0); } diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h similarity index 83% copy from cpp/velox/shuffle/VeloxShuffleWriter.h copy to cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h index e699a323b..a11f84e95 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h @@ -36,10 +36,10 @@ #include <arrow/result.h> #include <arrow/type.h> +#include "VeloxShuffleWriter.h" #include "memory/VeloxMemoryManager.h" #include "shuffle/PartitionWriter.h" #include "shuffle/Partitioner.h" -#include "shuffle/ShuffleWriter.h" #include "shuffle/Utils.h" #include "utils/Print.h" @@ -88,7 +88,6 @@ namespace gluten { #endif // end of VELOX_SHUFFLE_WRITER_PRINT enum SplitState { kInit, kPreAlloc, kSplit, kStop }; -enum EvictState { kEvictable, kUnevictable }; struct BinaryArrayResizeState { bool inResize; @@ -100,7 +99,7 @@ struct BinaryArrayResizeState { : inResize(false), partitionId(partitionId), binaryIdx(binaryIdx) {} }; -class VeloxShuffleWriter final : public ShuffleWriter { +class VeloxHashBasedShuffleWriter : public VeloxShuffleWriter { enum { kValidityBufferIndex = 0, kFixedWidthValueBufferIndex = 1, @@ -130,7 +129,7 @@ class VeloxShuffleWriter final : public ShuffleWriter { std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, arrow::MemoryPool* arrowPool); - arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) override; + arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) override; arrow::Status stop() override; @@ -138,12 +137,10 @@ class VeloxShuffleWriter final : public ShuffleWriter { const uint64_t cachedPayloadSize() const override; - arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers); - - int64_t rawPartitionBytes() const; + arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) override; // For test only. - void setPartitionBufferSize(uint32_t newSize); + void setPartitionBufferSize(uint32_t newSize) override; // for debugging void printColumnsInfo() const { @@ -192,22 +189,14 @@ class VeloxShuffleWriter final : public ShuffleWriter { VS_PRINT_CONTAINER(input_has_null_); } - int32_t maxBatchSize() const { - return maxBatchSize_; - } - private: - VeloxShuffleWriter( + VeloxHashBasedShuffleWriter( uint32_t numPartitions, std::unique_ptr<PartitionWriter> partitionWriter, ShuffleWriterOptions options, std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, arrow::MemoryPool* pool) - : ShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), pool), - veloxPool_(std::move(veloxPool)) { - arenas_.resize(numPartitions); - serdeOptions_.useLosslessTimestamp = true; - } + : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool) {} arrow::Status init(); @@ -314,14 +303,8 @@ class VeloxShuffleWriter final : public ShuffleWriter { arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit); - SplitState splitState_{kInit}; - - EvictState evictState_{kEvictable}; - BinaryArrayResizeState binaryArrayResizeState_{}; - bool supportAvx512_ = false; - bool hasComplexType_ = false; std::vector<bool> isValidityBuffer_; @@ -415,66 +398,9 @@ class VeloxShuffleWriter final : public ShuffleWriter { std::vector<std::shared_ptr<arrow::ResizableBuffer>> complexTypeFlushBuffer_; std::shared_ptr<const facebook::velox::RowType> complexWriteType_; - std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_; - std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_; facebook::velox::serializer::presto::PrestoVectorSerde serde_; - facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_; - - // stat - enum CpuWallTimingType { - CpuWallTimingBegin = 0, - CpuWallTimingCompute = CpuWallTimingBegin, - CpuWallTimingBuildPartition, - CpuWallTimingEvictPartition, - CpuWallTimingHasNull, - CpuWallTimingCalculateBufferSize, - CpuWallTimingAllocateBuffer, - CpuWallTimingCreateRbFromBuffer, - CpuWallTimingMakeRB, - CpuWallTimingCacheRB, - CpuWallTimingFlattenRV, - CpuWallTimingSplitRV, - CpuWallTimingIteratePartitions, - CpuWallTimingStop, - CpuWallTimingEnd, - CpuWallTimingNum = CpuWallTimingEnd - CpuWallTimingBegin - }; - - static std::string CpuWallTimingName(CpuWallTimingType type) { - switch (type) { - case CpuWallTimingCompute: - return "CpuWallTimingCompute"; - case CpuWallTimingBuildPartition: - return "CpuWallTimingBuildPartition"; - case CpuWallTimingEvictPartition: - return "CpuWallTimingEvictPartition"; - case CpuWallTimingHasNull: - return "CpuWallTimingHasNull"; - case CpuWallTimingCalculateBufferSize: - return "CpuWallTimingCalculateBufferSize"; - case CpuWallTimingAllocateBuffer: - return "CpuWallTimingAllocateBuffer"; - case CpuWallTimingCreateRbFromBuffer: - return "CpuWallTimingCreateRbFromBuffer"; - case CpuWallTimingMakeRB: - return "CpuWallTimingMakeRB"; - case CpuWallTimingCacheRB: - return "CpuWallTimingCacheRB"; - case CpuWallTimingFlattenRV: - return "CpuWallTimingFlattenRV"; - case CpuWallTimingSplitRV: - return "CpuWallTimingSplitRV"; - case CpuWallTimingIteratePartitions: - return "CpuWallTimingIteratePartitions"; - case CpuWallTimingStop: - return "CpuWallTimingStop"; - default: - return "CpuWallTimingUnknown"; - } - } - facebook::velox::CpuWallTiming cpuWallTimingList_[CpuWallTimingNum]; - int32_t maxBatchSize_{0}; -}; // class VeloxShuffleWriter + SplitState splitState_{kInit}; +}; // class VeloxHashBasedShuffleWriter } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 5c4e01936..22298ef91 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -370,39 +370,104 @@ std::shared_ptr<ColumnarBatch> VeloxColumnarBatchDeserializer::next() { VeloxColumnarBatchDeserializerFactory::VeloxColumnarBatchDeserializerFactory( const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<arrow::util::Codec>& codec, + const facebook::velox::common::CompressionKind veloxCompressionType, const RowTypePtr& rowType, int32_t batchSize, arrow::MemoryPool* memoryPool, - std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool) + std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, + ShuffleWriterType shuffleWriterType) : schema_(schema), codec_(codec), + veloxCompressionType_(veloxCompressionType), rowType_(rowType), batchSize_(batchSize), memoryPool_(memoryPool), - veloxPool_(veloxPool) { + veloxPool_(veloxPool), + shuffleWriterType_(shuffleWriterType) { initFromSchema(); } std::unique_ptr<ColumnarBatchIterator> VeloxColumnarBatchDeserializerFactory::createDeserializer( std::shared_ptr<arrow::io::InputStream> in) { - return std::make_unique<VeloxColumnarBatchDeserializer>( - std::move(in), - schema_, - codec_, + if (shuffleWriterType_ == kHashShuffle) { + return std::make_unique<VeloxColumnarBatchDeserializer>( + std::move(in), + schema_, + codec_, + rowType_, + batchSize_, + memoryPool_, + veloxPool_.get(), + &isValidityBuffer_, + hasComplexType_, + deserializeTime_, + decompressTime_); + } + return std::make_unique<VeloxShuffleReaderOutStreamWrapper>( + veloxPool_, rowType_, batchSize_, - memoryPool_, - veloxPool_.get(), - &isValidityBuffer_, - hasComplexType_, - deserializeTime_, - decompressTime_); + veloxCompressionType_, + [this](int64_t decompressionTime) { this->decompressTime_ += decompressionTime; }, + [this](int64_t deserializeTime) { this->deserializeTime_ += deserializeTime; }, + in); +} + +VeloxShuffleReaderOutStreamWrapper::VeloxShuffleReaderOutStreamWrapper( + const std::shared_ptr<facebook::velox::memory::MemoryPool>& veloxPool, + const RowTypePtr& rowType, + int32_t batchSize, + facebook::velox::common::CompressionKind veloxCompressionType, + const std::function<void(int64_t)> decompressionTimeAccumulator, + const std::function<void(int64_t)> deserializeTimeAccumulator, + const std::shared_ptr<arrow::io::InputStream> in) + : veloxPool_(veloxPool), + rowType_(rowType), + batchSize_(batchSize), + veloxCompressionType_(veloxCompressionType), + decompressionTimeAccumulator_(decompressionTimeAccumulator), + deserializeTimeAccumulator_(deserializeTimeAccumulator) { + constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize; + auto buffer = AlignedBuffer::allocate<char>(kMaxReadBufferSize, veloxPool_.get()); + in_ = std::make_unique<VeloxInputStream>(std::move(in), std::move(buffer)); + serdeOptions_ = {false, veloxCompressionType_}; + RowVectorPtr rowVector; +} + +std::shared_ptr<ColumnarBatch> VeloxShuffleReaderOutStreamWrapper::next() { + if (!in_->hasNext()) { + return nullptr; + } + + RowVectorPtr rowVector; + VectorStreamGroup::read(in_.get(), veloxPool_.get(), rowType_, &rowVector, &serdeOptions_); + + if (rowVector->size() >= batchSize_) { + return std::make_shared<VeloxColumnarBatch>(std::move(rowVector)); + } + + while (rowVector->size() < batchSize_ && in_->hasNext()) { + RowVectorPtr rowVectorTemp; + VectorStreamGroup::read(in_.get(), veloxPool_.get(), rowType_, &rowVectorTemp, &serdeOptions_); + rowVector->append(rowVectorTemp.get()); + } + + int64_t decompressTime = 0LL; + int64_t deserializeTime = 0LL; + + decompressionTimeAccumulator_(decompressTime); + deserializeTimeAccumulator_(deserializeTime); + return std::make_shared<VeloxColumnarBatch>(std::move(rowVector)); } arrow::MemoryPool* VeloxColumnarBatchDeserializerFactory::getPool() { return memoryPool_; } +ShuffleWriterType VeloxColumnarBatchDeserializerFactory::getShuffleWriterType() { + return shuffleWriterType_; +} + int64_t VeloxColumnarBatchDeserializerFactory::getDecompressTime() { return decompressTime_; } @@ -440,4 +505,30 @@ void VeloxColumnarBatchDeserializerFactory::initFromSchema() { } } } + +VeloxInputStream::VeloxInputStream(std::shared_ptr<arrow::io::InputStream> input, facebook::velox::BufferPtr buffer) + : in_(std::move(input)), buffer_(std::move(buffer)) { + next(true); +} + +bool VeloxInputStream::hasNext() { + if (offset_ == 0) { + return false; + } + if (ranges()[0].position >= ranges()[0].size) { + next(true); + return offset_ != 0; + } + return true; +} + +void VeloxInputStream::next(bool throwIfPastEnd) { + const uint32_t readBytes = buffer_->capacity(); + offset_ = in_->Read(readBytes, buffer_->asMutable<char>()).ValueOr(0); + if (offset_ > 0) { + int32_t realBytes = offset_; + VELOX_CHECK_LT(0, realBytes, "Reading past end of spill file"); + setRange({buffer_->asMutable<uint8_t>(), realBytes, 0}); + } +} } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index 18df38006..3a0d8f9ff 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -22,6 +22,8 @@ #include "velox/type/Type.h" #include "velox/vector/ComplexVector.h" +#include <velox/serializers/PrestoSerializer.h> + namespace gluten { class VeloxColumnarBatchDeserializer final : public ColumnarBatchIterator { @@ -59,15 +61,57 @@ class VeloxColumnarBatchDeserializer final : public ColumnarBatchIterator { bool reachEos_{false}; }; +class VeloxInputStream : public facebook::velox::ByteInputStream { + public: + VeloxInputStream(std::shared_ptr<arrow::io::InputStream> input, facebook::velox::BufferPtr buffer); + + bool hasNext(); + + void next(bool throwIfPastEnd) override; + + std::shared_ptr<arrow::io::InputStream> in_; + const facebook::velox::BufferPtr buffer_; + uint64_t offset_ = -1; +}; + +class VeloxShuffleReaderOutStreamWrapper : public ColumnarBatchIterator { + public: + VeloxShuffleReaderOutStreamWrapper( + const std::shared_ptr<facebook::velox::memory::MemoryPool>& veloxPool, + const facebook::velox::RowTypePtr& rowType, + int32_t batchSize, + const facebook::velox::common::CompressionKind veloxCompressionType, + const std::function<void(int64_t)> decompressionTimeAccumulator, + const std::function<void(int64_t)> deserializeTimeAccumulator, + const std::shared_ptr<arrow::io::InputStream> in); + + std::shared_ptr<ColumnarBatch> next(); + + private: + std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_; + facebook::velox::RowTypePtr rowType_; + std::vector<facebook::velox::RowVectorPtr> batches_; + bool reachEos_{false}; + int32_t rowCount_; + int32_t batchSize_; + facebook::velox::common::CompressionKind veloxCompressionType_; + facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_; + std::function<void(int64_t)> decompressionTimeAccumulator_; + std::function<void(int64_t)> deserializeTimeAccumulator_; + std::shared_ptr<VeloxInputStream> in_; +}; + class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory { public: VeloxColumnarBatchDeserializerFactory( const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<arrow::util::Codec>& codec, + const facebook::velox::common::CompressionKind veloxCompressionType, const facebook::velox::RowTypePtr& rowType, int32_t batchSize, arrow::MemoryPool* memoryPool, - std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool); + std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, + ShuffleWriterType shuffleWriterType); std::unique_ptr<ColumnarBatchIterator> createDeserializer(std::shared_ptr<arrow::io::InputStream> in) override; @@ -77,9 +121,12 @@ class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory { int64_t getDeserializeTime() override; + ShuffleWriterType getShuffleWriterType() override; + private: std::shared_ptr<arrow::Schema> schema_; std::shared_ptr<arrow::util::Codec> codec_; + facebook::velox::common::CompressionKind veloxCompressionType_; facebook::velox::RowTypePtr rowType_; int32_t batchSize_; arrow::MemoryPool* memoryPool_; @@ -88,6 +135,8 @@ class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory { std::vector<bool> isValidityBuffer_; bool hasComplexType_{false}; + ShuffleWriterType shuffleWriterType_; + int64_t deserializeTime_{0}; int64_t decompressTime_{0}; diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index e699a323b..104b87616 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -46,157 +46,60 @@ namespace gluten { -// set 1 to open print -#define VELOX_SHUFFLE_WRITER_PRINT 0 - -#if VELOX_SHUFFLE_WRITER_PRINT - -#define VsPrint Print -#define VsPrintLF PrintLF -#define VsPrintSplit PrintSplit -#define VsPrintSplitLF PrintSplitLF -#define VsPrintVectorRange PrintVectorRange -#define VS_PRINT PRINT -#define VS_PRINTLF PRINTLF -#define VS_PRINT_FUNCTION_NAME PRINT_FUNCTION_NAME -#define VS_PRINT_FUNCTION_SPLIT_LINE PRINT_FUNCTION_SPLIT_LINE -#define VS_PRINT_CONTAINER PRINT_CONTAINER -#define VS_PRINT_CONTAINER_TO_STRING PRINT_CONTAINER_TO_STRING -#define VS_PRINT_CONTAINER_2_STRING PRINT_CONTAINER_2_STRING -#define VS_PRINT_VECTOR_TO_STRING PRINT_VECTOR_TO_STRING -#define VS_PRINT_VECTOR_2_STRING PRINT_VECTOR_2_STRING -#define VS_PRINT_VECTOR_MAPPING PRINT_VECTOR_MAPPING - -#else // VELOX_SHUFFLE_WRITER_PRINT - -#define VsPrint(...) // NOLINT -#define VsPrintLF(...) // NOLINT -#define VsPrintSplit(...) // NOLINT -#define VsPrintSplitLF(...) // NOLINT -#define VsPrintVectorRange(...) // NOLINT -#define VS_PRINT(a) -#define VS_PRINTLF(a) -#define VS_PRINT_FUNCTION_NAME() -#define VS_PRINT_FUNCTION_SPLIT_LINE() -#define VS_PRINT_CONTAINER(c) -#define VS_PRINT_CONTAINER_TO_STRING(c) -#define VS_PRINT_CONTAINER_2_STRING(c) -#define VS_PRINT_VECTOR_TO_STRING(v) -#define VS_PRINT_VECTOR_2_STRING(v) -#define VS_PRINT_VECTOR_MAPPING(v) - -#endif // end of VELOX_SHUFFLE_WRITER_PRINT - -enum SplitState { kInit, kPreAlloc, kSplit, kStop }; -enum EvictState { kEvictable, kUnevictable }; - -struct BinaryArrayResizeState { - bool inResize; - uint32_t partitionId; - uint32_t binaryIdx; - - BinaryArrayResizeState() : inResize(false) {} - BinaryArrayResizeState(uint32_t partitionId, uint32_t binaryIdx) - : inResize(false), partitionId(partitionId), binaryIdx(binaryIdx) {} -}; - -class VeloxShuffleWriter final : public ShuffleWriter { - enum { - kValidityBufferIndex = 0, - kFixedWidthValueBufferIndex = 1, - kBinaryValueBufferIndex = 2, - kBinaryLengthBufferIndex = kFixedWidthValueBufferIndex - }; - +class VeloxShuffleWriter : public ShuffleWriter { public: - struct BinaryBuf { - BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacityIn, uint64_t valueOffsetIn) - : valuePtr(value), lengthPtr(length), valueCapacity(valueCapacityIn), valueOffset(valueOffsetIn) {} - - BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacity) : BinaryBuf(value, length, valueCapacity, 0) {} - - BinaryBuf() : BinaryBuf(nullptr, nullptr, 0) {} - - uint8_t* valuePtr; - uint8_t* lengthPtr; - uint64_t valueCapacity; - uint64_t valueOffset; - }; - - static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create( - uint32_t numPartitions, - std::unique_ptr<PartitionWriter> partitionWriter, - ShuffleWriterOptions options, - std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, - arrow::MemoryPool* arrowPool); - - arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) override; - - arrow::Status stop() override; - - arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override; - - const uint64_t cachedPayloadSize() const override; - - arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers); - - int64_t rawPartitionBytes() const; - - // For test only. - void setPartitionBufferSize(uint32_t newSize); + facebook::velox::RowVectorPtr getStrippedRowVector(const facebook::velox::RowVector& rv) { + // get new row type + auto rowType = rv.type()->asRow(); + auto typeChildren = rowType.children(); + typeChildren.erase(typeChildren.begin()); + auto newRowType = facebook::velox::ROW(std::move(typeChildren)); + + // get length + auto length = rv.size(); + + // get children + auto children = rv.children(); + children.erase(children.begin()); + + return std::make_shared<facebook::velox::RowVector>( + rv.pool(), newRowType, facebook::velox::BufferPtr(nullptr), length, std::move(children)); + } - // for debugging - void printColumnsInfo() const { - VS_PRINT_FUNCTION_SPLIT_LINE(); - VS_PRINTLF(fixed_width_column_count_); + const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) { + VELOX_CHECK(rv.childrenSize() > 0, "RowVector missing partition id column."); - VS_PRINT_CONTAINER(simple_column_indices_); - VS_PRINT_CONTAINER(binary_column_indices_); - VS_PRINT_CONTAINER(complex_column_indices_); + auto& firstChild = rv.childAt(0); + VELOX_CHECK(firstChild->isFlatEncoding(), "Partition id (field 0) is not flat encoding."); + VELOX_CHECK( + firstChild->type()->isInteger(), + "Partition id (field 0) should be integer, but got {}", + firstChild->type()->toString()); - VS_PRINT_VECTOR_2_STRING(velox_column_types_); - VS_PRINT_VECTOR_TO_STRING(arrow_column_types_); + // first column is partition key hash value or pid + return firstChild->asFlatVector<int32_t>()->rawValues(); } - void printPartition() const { - VS_PRINT_FUNCTION_SPLIT_LINE(); - // row ID -> partition ID - VS_PRINT_VECTOR_MAPPING(row_2_partition_); - - // partition -> row count - VS_PRINT_VECTOR_MAPPING(partition_2_row_count_); - } + // For test only. + virtual void setPartitionBufferSize(uint32_t newSize) {} - void printPartitionBuffer() const { - VS_PRINT_FUNCTION_SPLIT_LINE(); - VS_PRINT_VECTOR_MAPPING(partition_2_buffer_size_); - VS_PRINT_VECTOR_MAPPING(partitionBufferBase_); + virtual arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) { + return arrow::Status::OK(); } - void printPartition2Row() const { - VS_PRINT_FUNCTION_SPLIT_LINE(); - VS_PRINT_VECTOR_MAPPING(partition2RowOffsetBase_); - -#if VELOX_SHUFFLE_WRITER_PRINT - for (auto pid = 0; pid < numPartitions_; ++pid) { - auto begin = partition2RowOffsetBase_[pid]; - auto end = partition2RowOffsetBase_[pid + 1]; - VsPrint("partition", pid); - VsPrintVectorRange(rowOffset2RowId_, begin, end); - } -#endif + virtual arrow::Status evictRowVector(uint32_t partitionId) { + return arrow::Status::OK(); } - void printInputHasNull() const { - VS_PRINT_FUNCTION_SPLIT_LINE(); - VS_PRINT_CONTAINER(input_has_null_); + virtual const uint64_t cachedPayloadSize() const { + return 0; } int32_t maxBatchSize() const { return maxBatchSize_; } - private: + protected: VeloxShuffleWriter( uint32_t numPartitions, std::unique_ptr<PartitionWriter> partitionWriter, @@ -209,216 +112,19 @@ class VeloxShuffleWriter final : public ShuffleWriter { serdeOptions_.useLosslessTimestamp = true; } - arrow::Status init(); - - arrow::Status initPartitions(); - - arrow::Status initColumnTypes(const facebook::velox::RowVector& rv); - - arrow::Status splitRowVector(const facebook::velox::RowVector& rv); - - arrow::Status initFromRowVector(const facebook::velox::RowVector& rv); - - arrow::Status buildPartition2Row(uint32_t rowNum); - - arrow::Status updateInputHasNull(const facebook::velox::RowVector& rv); - - void setSplitState(SplitState state); - - arrow::Status doSplit(const facebook::velox::RowVector& rv, int64_t memLimit); - - bool beyondThreshold(uint32_t partitionId, uint32_t newSize); - - uint32_t calculatePartitionBufferSize(const facebook::velox::RowVector& rv, int64_t memLimit); - - arrow::Status preAllocPartitionBuffers(uint32_t preAllocBufferSize); - - arrow::Status updateValidityBuffers(uint32_t partitionId, uint32_t newSize); - - arrow::Result<std::shared_ptr<arrow::ResizableBuffer>> - allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint32_t newSize); - - arrow::Status allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize); - - arrow::Status splitFixedWidthValueBuffer(const facebook::velox::RowVector& rv); - - arrow::Status splitBoolType(const uint8_t* srcAddr, const std::vector<uint8_t*>& dstAddrs); - - arrow::Status splitValidityBuffer(const facebook::velox::RowVector& rv); + virtual ~VeloxShuffleWriter() = default; - arrow::Status splitBinaryArray(const facebook::velox::RowVector& rv); - - arrow::Status splitComplexType(const facebook::velox::RowVector& rv); - - arrow::Status evictBuffers( - uint32_t partitionId, - uint32_t numRows, - std::vector<std::shared_ptr<arrow::Buffer>> buffers, - bool reuseBuffers); - - arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> assembleBuffers(uint32_t partitionId, bool reuseBuffers); - - template <typename T> - arrow::Status splitFixedType(const uint8_t* srcAddr, const std::vector<uint8_t*>& dstAddrs) { - for (auto& pid : partitionUsed_) { - auto dstPidBase = (T*)(dstAddrs[pid] + partitionBufferBase_[pid] * sizeof(T)); - auto pos = partition2RowOffsetBase_[pid]; - auto end = partition2RowOffsetBase_[pid + 1]; - for (; pos < end; ++pos) { - auto rowId = rowOffset2RowId_[pos]; - *dstPidBase++ = reinterpret_cast<const T*>(srcAddr)[rowId]; // copy - } - } - return arrow::Status::OK(); - } - - arrow::Status splitBinaryType( - uint32_t binaryIdx, - const facebook::velox::FlatVector<facebook::velox::StringView>& src, - std::vector<BinaryBuf>& dst); - - arrow::Result<int64_t> evictCachedPayload(int64_t size); - - arrow::Result<std::shared_ptr<arrow::Buffer>> generateComplexTypeBuffers(facebook::velox::RowVectorPtr vector); - - arrow::Status resetValidityBuffer(uint32_t partitionId); - - arrow::Result<int64_t> shrinkPartitionBuffersMinSize(int64_t size); - - arrow::Result<int64_t> evictPartitionBuffersMinSize(int64_t size); - - arrow::Status shrinkPartitionBuffer(uint32_t partitionId); - - arrow::Status resetPartitionBuffer(uint32_t partitionId); - - // Resize the partition buffer to newSize. If preserveData is true, it will keep the data in buffer. - // Note when preserveData is false, and newSize is larger, this function can introduce unnecessary memory copy. - // In this case, use allocatePartitionBuffer to free current buffers and allocate new buffers instead. - arrow::Status resizePartitionBuffer(uint32_t partitionId, uint32_t newSize, bool preserveData); - - uint64_t valueBufferSizeForBinaryArray(uint32_t binaryIdx, uint32_t newSize); - - uint64_t valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex, uint32_t newSize); - - void calculateSimpleColumnBytes(); - - void stat() const; - - bool shrinkPartitionBuffersAfterSpill() const; - - bool evictPartitionBuffersAfterSpill() const; - - arrow::Result<uint32_t> partitionBufferSizeAfterShrink(uint32_t partitionId) const; - - bool isExtremelyLargeBatch(facebook::velox::RowVectorPtr& rv) const; - - arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit); - - SplitState splitState_{kInit}; + std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_; - EvictState evictState_{kEvictable}; + facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_; - BinaryArrayResizeState binaryArrayResizeState_{}; + std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_; bool supportAvx512_ = false; - bool hasComplexType_ = false; - std::vector<bool> isValidityBuffer_; - - // Store arrow column types. Calculated once. - std::vector<std::shared_ptr<arrow::DataType>> arrowColumnTypes_; - - // Store velox column types. Calculated once. - std::vector<std::shared_ptr<const facebook::velox::Type>> veloxColumnTypes_; - - // How many fixed-width columns in the schema. Calculated once. - uint32_t fixedWidthColumnCount_ = 0; - - // The column indices of all binary types in the schema. - std::vector<uint32_t> binaryColumnIndices_; - - // The column indices of all fixed-width and binary columns in the schema. - std::vector<uint32_t> simpleColumnIndices_; - - // The column indices of all complex types in the schema, including Struct, Map, List columns. - std::vector<uint32_t> complexColumnIndices_; - - // Total bytes of fixed-width buffers of all simple columns. Including validity buffers, value buffers of - // fixed-width types and length buffers of binary types. - // Used for estimating pre-allocated partition buffer size. Calculated once. - uint32_t fixedWidthBufferBytes_ = 0; - - // Used for calculating the average binary length. - // Updated for each input RowVector. - uint64_t totalInputNumRows_ = 0; - std::vector<uint64_t> binaryArrayTotalSizeBytes_; - - // True if input column has null in any processed input RowVector. - // In the order of fixed-width columns + binary columns. - std::vector<bool> inputHasNull_; - - // Records which partitions are actually occurred in the current input RowVector. - // Most of the loops can loop on this array to avoid visiting unused partition id. - std::vector<uint32_t> partitionUsed_; - - // Row ID -> Partition ID - // subscript: The index of row in the current input RowVector - // value: Partition ID - // Updated for each input RowVector. - std::vector<uint32_t> row2Partition_; - - // Partition ID -> Row Count - // subscript: Partition ID - // value: How many rows does this partition have in the current input RowVector - // Updated for each input RowVector. - std::vector<uint32_t> partition2RowCount_; - - // Note: partition2RowOffsetBase_ and rowOffset2RowId_ are the optimization of flattening the 2-dimensional vector - // into single dimension. - // The first dimension is the partition id. The second dimension is the ith occurrence of this partition in the - // input RowVector. The value is the index of the row in the input RowVector. - // partition2RowOffsetBase_ records the offset of the first dimension. - // - // The index of the ith occurrence of a give partition `pid` in the input RowVector can be calculated via - // rowOffset2RowId_[partition2RowOffsetBase_[pid] + i] - // i is in the range of [0, partition2RowCount_[pid]) - - // Partition ID -> Row offset, elements num: Partition num + 1 - // subscript: Partition ID - // value: The base row offset of this Partition - // Updated for each input RowVector. - std::vector<uint32_t> partition2RowOffsetBase_; - - // Row offset -> Source row ID, elements num: input RowVector row num - // subscript: Row offset - // value: The index of row in the current input RowVector - // Updated for each input RowVector. - std::vector<uint32_t> rowOffset2RowId_; - - // Partition buffers are used for holding the intermediate data during split. - // Partition ID -> Partition buffer size(unit is row) - std::vector<uint32_t> partitionBufferSize_; - - // The write position of partition buffer. Updated after split. Reset when partition buffers are reallocated. - std::vector<uint32_t> partitionBufferBase_; - - // Used by all simple types. Stores raw pointers of partition buffers. - std::vector<std::vector<uint8_t*>> partitionValidityAddrs_; - // Used by fixed-width types. Stores raw pointers of partition buffers. - std::vector<std::vector<uint8_t*>> partitionFixedWidthValueAddrs_; - // Used by binary types. Stores raw pointers and metadata of partition buffers. - std::vector<std::vector<BinaryBuf>> partitionBinaryAddrs_; - - // Used by complex types. - // Partition id -> Serialized complex data. - std::vector<std::unique_ptr<facebook::velox::IterativeVectorSerializer>> complexTypeData_; - std::vector<std::shared_ptr<arrow::ResizableBuffer>> complexTypeFlushBuffer_; - std::shared_ptr<const facebook::velox::RowType> complexWriteType_; + int32_t maxBatchSize_{0}; - std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_; - std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_; - facebook::velox::serializer::presto::PrestoVectorSerde serde_; - facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_; + enum EvictState { kEvictable, kUnevictable }; // stat enum CpuWallTimingType { @@ -474,7 +180,28 @@ class VeloxShuffleWriter final : public ShuffleWriter { } facebook::velox::CpuWallTiming cpuWallTimingList_[CpuWallTimingNum]; - int32_t maxBatchSize_{0}; -}; // class VeloxShuffleWriter + + EvictState evictState_{kEvictable}; + + class EvictGuard { + public: + explicit EvictGuard(EvictState& evictState) : evictState_(evictState) { + evictState_ = EvictState::kUnevictable; + } + + ~EvictGuard() { + evictState_ = EvictState::kEvictable; + } + + // For safety and clarity. + EvictGuard(const EvictGuard&) = delete; + EvictGuard& operator=(const EvictGuard&) = delete; + EvictGuard(EvictGuard&&) = delete; + EvictGuard& operator=(EvictGuard&&) = delete; + + private: + EvictState& evictState_; + }; +}; } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc new file mode 100644 index 000000000..b0c2cc8ad --- /dev/null +++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "VeloxSortBasedShuffleWriter.h" +#include "memory/ArrowMemory.h" +#include "memory/VeloxColumnarBatch.h" +#include "memory/VeloxMemoryManager.h" +#include "shuffle/ShuffleSchema.h" +#include "utils/Common.h" +#include "utils/VeloxArrowUtils.h" +#include "utils/macros.h" +#include "velox/common/base/Nulls.h" +#include "velox/type/Type.h" +#include "velox/vector/ComplexVector.h" + +#if defined(__x86_64__) +#include <immintrin.h> +#include <x86intrin.h> +#elif defined(__aarch64__) +#include <arm_neon.h> +#endif + +namespace gluten { + +#define VELOX_SHUFFLE_WRITER_LOG_FLAG 0 + +// macro to rotate left an 8-bit value 'x' given the shift 's' is a 32-bit integer +// (x is left shifted by 's' modulo 8) OR (x right shifted by (8 - 's' modulo 8)) +#if !defined(__x86_64__) +#define rotateLeft(x, s) (x << (s - ((s >> 3) << 3)) | x >> (8 - (s - ((s >> 3) << 3)))) +#endif + +// on x86 machines, _MM_HINT_T0,T1,T2 are defined as 1, 2, 3 +// equivalent mapping to __builtin_prefetch hints is 3, 2, 1 +#if defined(__x86_64__) +#define PREFETCHT0(ptr) _mm_prefetch(ptr, _MM_HINT_T0) +#define PREFETCHT1(ptr) _mm_prefetch(ptr, _MM_HINT_T1) +#define PREFETCHT2(ptr) _mm_prefetch(ptr, _MM_HINT_T2) +#else +#define PREFETCHT0(ptr) __builtin_prefetch(ptr, 0, 3) +#define PREFETCHT1(ptr) __builtin_prefetch(ptr, 0, 2) +#define PREFETCHT2(ptr) __builtin_prefetch(ptr, 0, 1) +#endif + +arrow::Result<std::shared_ptr<VeloxShuffleWriter>> VeloxSortBasedShuffleWriter::create( + uint32_t numPartitions, + std::unique_ptr<PartitionWriter> partitionWriter, + ShuffleWriterOptions options, + std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, + arrow::MemoryPool* arrowPool) { + std::shared_ptr<VeloxSortBasedShuffleWriter> res(new VeloxSortBasedShuffleWriter( + numPartitions, std::move(partitionWriter), std::move(options), veloxPool, arrowPool)); + RETURN_NOT_OK(res->init()); + return res; +} // namespace gluten + +arrow::Status VeloxSortBasedShuffleWriter::init() { +#if defined(__x86_64__) + supportAvx512_ = __builtin_cpu_supports("avx512bw"); +#else + supportAvx512_ = false; +#endif + + ARROW_ASSIGN_OR_RAISE( + partitioner_, Partitioner::make(options_.partitioning, numPartitions_, options_.startPartitionId)); + DLOG(INFO) << "Create partitioning type: " << std::to_string(options_.partitioning); + + partition2RowCount_.resize(numPartitions_); + rowVectorIndexMap_.reserve(numPartitions_); + for (auto pid = 0; pid < numPartitions_; ++pid) { + rowVectorIndexMap_[pid].reserve(options_.bufferSize); + } + + return arrow::Status::OK(); +} + +arrow::Status VeloxSortBasedShuffleWriter::doSort(facebook::velox::RowVectorPtr rv, int64_t memLimit) { + currentInputColumnBytes_ += rv->estimateFlatSize(); + batches_.push_back(rv); + if (currentInputColumnBytes_ > memLimit) { + for (auto pid = 0; pid < numPartitions(); ++pid) { + RETURN_NOT_OK(evictRowVector(pid)); + partition2RowCount_[pid] = 0; + } + batches_.clear(); + currentInputColumnBytes_ = 0; + } + setSortState(SortState::kSortInit); + return arrow::Status::OK(); +} + +arrow::Status VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) { + if (options_.partitioning == Partitioning::kSingle) { + auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb); + VELOX_CHECK_NOT_NULL(veloxColumnBatch); + auto rv = veloxColumnBatch->getFlattenedRowVector(); + RETURN_NOT_OK(initFromRowVector(*rv.get())); + RETURN_NOT_OK(doSort(rv, partitionWriter_.get()->options().sortBufferMaxSize)); + } else if (options_.partitioning == Partitioning::kRange) { + auto compositeBatch = std::dynamic_pointer_cast<CompositeColumnarBatch>(cb); + VELOX_CHECK_NOT_NULL(compositeBatch); + auto batches = compositeBatch->getBatches(); + VELOX_CHECK_EQ(batches.size(), 2); + auto pidBatch = VeloxColumnarBatch::from(veloxPool_.get(), batches[0]); + auto pidArr = getFirstColumn(*(pidBatch->getRowVector())); + START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); + setSortState(SortState::kSort); + RETURN_NOT_OK(partitioner_->compute(pidArr, pidBatch->numRows(), batches_.size(), rowVectorIndexMap_)); + END_TIMING(); + auto rvBatch = VeloxColumnarBatch::from(veloxPool_.get(), batches[1]); + auto rv = rvBatch->getFlattenedRowVector(); + RETURN_NOT_OK(initFromRowVector(*rv.get())); + RETURN_NOT_OK(doSort(rv, partitionWriter_.get()->options().sortBufferMaxSize)); + } else { + auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb); + VELOX_CHECK_NOT_NULL(veloxColumnBatch); + facebook::velox::RowVectorPtr rv; + START_TIMING(cpuWallTimingList_[CpuWallTimingFlattenRV]); + rv = veloxColumnBatch->getFlattenedRowVector(); + END_TIMING(); + if (partitioner_->hasPid()) { + auto pidArr = getFirstColumn(*rv); + START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); + setSortState(SortState::kSort); + RETURN_NOT_OK(partitioner_->compute(pidArr, rv->size(), batches_.size(), rowVectorIndexMap_)); + END_TIMING(); + auto strippedRv = getStrippedRowVector(*rv); + RETURN_NOT_OK(initFromRowVector(*strippedRv)); + RETURN_NOT_OK(doSort(strippedRv, partitionWriter_.get()->options().sortBufferMaxSize)); + } else { + RETURN_NOT_OK(initFromRowVector(*rv)); + START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); + setSortState(SortState::kSort); + RETURN_NOT_OK(partitioner_->compute(nullptr, rv->size(), batches_.size(), rowVectorIndexMap_)); + END_TIMING(); + RETURN_NOT_OK(doSort(rv, partitionWriter_.get()->options().sortBufferMaxSize)); + } + } + return arrow::Status::OK(); +} + +arrow::Status VeloxSortBasedShuffleWriter::evictBatch( + uint32_t partitionId, + std::ostringstream* output, + facebook::velox::OStreamOutputStream* out, + facebook::velox::RowTypePtr* rowTypePtr) { + int64_t rawSize = batch_->size(); + batch_->flush(out); + const std::string& outputStr = output->str(); + RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize, outputStr.c_str(), outputStr.size())); + batch_.reset(); + output->clear(); + output->str(""); + batch_ = std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), serde_.get()); + batch_->createStreamTree(*rowTypePtr, options_.bufferSize, &serdeOptions_); + return arrow::Status::OK(); +} + +arrow::Status VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId) { + int32_t rowNum = 0; + const int32_t maxBatchNum = options_.bufferSize; + auto rowTypePtr = std::static_pointer_cast<const facebook::velox::RowType>(rowType_.value()); + std::ostringstream output; + facebook::velox::OStreamOutputStream out(&output); + + if (options_.partitioning != Partitioning::kSingle) { + if (auto it = rowVectorIndexMap_.find(partitionId); it != rowVectorIndexMap_.end()) { + auto rowVectorIndex = it->second; + const int32_t outputSize = rowVectorIndex.size(); + + std::map<int32_t, std::vector<facebook::velox::IndexRange>> groupedIndices; + std::map<int32_t, int64_t> groupedSize; + + int32_t tempVectorIndex = -1; + int32_t baseRowIndex = -1; + int32_t tempRowIndex = -1; + int32_t size = 1; + for (int start = 0; start < outputSize; start++) { + const int64_t rowVector = rowVectorIndex[start]; + const int32_t vectorIndex = static_cast<int32_t>(rowVector >> 32); + const int32_t rowIndex = static_cast<int32_t>(rowVector & 0xFFFFFFFFLL); + if (tempVectorIndex == -1) { + tempVectorIndex = vectorIndex; + baseRowIndex = rowIndex; + tempRowIndex = rowIndex; + } else { + if (vectorIndex == tempVectorIndex && rowIndex == tempRowIndex + 1) { + size += 1; + tempRowIndex = rowIndex; + } else { + groupedIndices[tempVectorIndex].push_back({baseRowIndex, size}); + groupedSize[tempVectorIndex] += size; + size = 1; + tempVectorIndex = vectorIndex; + baseRowIndex = rowIndex; + tempRowIndex = rowIndex; + } + } + } + groupedIndices[tempVectorIndex].push_back({baseRowIndex, size}); + groupedSize[tempVectorIndex] += size; + + for (auto& pair : groupedIndices) { + batch_->append(batches_[pair.first], pair.second); + rowNum += groupedSize[pair.first]; + if (rowNum >= maxBatchNum) { + rowNum = 0; + RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr)); + } + } + + rowVectorIndex.clear(); + rowVectorIndexMap_.erase(partitionId); + } + } else { + for (facebook::velox::RowVectorPtr rowVectorPtr : batches_) { + rowNum += rowVectorPtr->size(); + batch_->append(rowVectorPtr); + if (rowNum >= maxBatchNum) { + RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr)); + rowNum = 0; + } + } + } + if (rowNum > 0) { + RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr)); + } + return arrow::Status::OK(); +} + +arrow::Status VeloxSortBasedShuffleWriter::stop() { + for (auto pid = 0; pid < numPartitions(); ++pid) { + RETURN_NOT_OK(evictRowVector(pid)); + partition2RowCount_[pid] = 0; + } + batches_.clear(); + currentInputColumnBytes_ = 0; + { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]); + setSortState(SortState::kSortStop); + RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + partitionBuffers_.clear(); + } + + stat(); + + return arrow::Status::OK(); +} + +arrow::Status VeloxSortBasedShuffleWriter::initFromRowVector(const facebook::velox::RowVector& rv) { + if (!rowType_.has_value()) { + rowType_ = rv.type(); + serdeOptions_ = { + false, facebook::velox::common::stringToCompressionKind(partitionWriter_->options().compressionTypeStr)}; + batch_ = std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), serde_.get()); + batch_->createStreamTree( + std::static_pointer_cast<const facebook::velox::RowType>(rowType_.value()), + options_.bufferSize, + &serdeOptions_); + } + return arrow::Status::OK(); +} + +arrow::Status VeloxSortBasedShuffleWriter::reclaimFixedSize(int64_t size, int64_t* actual) { + if (evictState_ == EvictState::kUnevictable) { + *actual = 0; + return arrow::Status::OK(); + } + EvictGuard evictGuard{evictState_}; + + if (sortState_ == SortState::kSortInit) { + for (auto pid = 0; pid < numPartitions(); ++pid) { + RETURN_NOT_OK(evictRowVector(pid)); + partition2RowCount_[pid] = 0; + } + batches_.clear(); + *actual = currentInputColumnBytes_; + currentInputColumnBytes_ = 0; + } + return arrow::Status::OK(); +} + +void VeloxSortBasedShuffleWriter::stat() const { +#if VELOX_SHUFFLE_WRITER_LOG_FLAG + for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) { + std::ostringstream oss; + auto& timing = cpuWallTimingList_[i]; + oss << "Velox shuffle writer stat:" << CpuWallTimingName((CpuWallTimingType)i); + oss << " " << timing.toString(); + if (timing.count > 0) { + oss << " wallNanos-avg:" << timing.wallNanos / timing.count; + oss << " cpuNanos-avg:" << timing.cpuNanos / timing.count; + } + LOG(INFO) << oss.str(); + } +#endif +} + +void VeloxSortBasedShuffleWriter::setSortState(SortState state) { + sortState_ = state; +} + +} // namespace gluten diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h new file mode 100644 index 000000000..e3ac07dfc --- /dev/null +++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <algorithm> +#include <memory> +#include <string> +#include <vector> + +#include "velox/common/time/CpuWallTimer.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/type/Type.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/FlatVector.h" +#include "velox/vector/VectorStream.h" + +#include <arrow/array/util.h> +#include <arrow/ipc/writer.h> +#include <arrow/memory_pool.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/type.h> + +#include "VeloxShuffleWriter.h" +#include "memory/VeloxMemoryManager.h" +#include "shuffle/PartitionWriter.h" +#include "shuffle/Partitioner.h" +#include "shuffle/Utils.h" + +#include "utils/Print.h" + +namespace gluten { + +enum SortState { kSortInit, kSort, kSortStop }; + +class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter { + public: + static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create( + uint32_t numPartitions, + std::unique_ptr<PartitionWriter> partitionWriter, + ShuffleWriterOptions options, + std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, + arrow::MemoryPool* arrowPool); + + arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) override; + + arrow::Status stop() override; + + arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override; + + arrow::Status evictRowVector(uint32_t partitionId) override; + + arrow::Status evictBatch( + uint32_t partitionId, + std::ostringstream* output, + facebook::velox::OStreamOutputStream* out, + facebook::velox::RowTypePtr* rowTypePtr); + + private: + VeloxSortBasedShuffleWriter( + uint32_t numPartitions, + std::unique_ptr<PartitionWriter> partitionWriter, + ShuffleWriterOptions options, + std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, + arrow::MemoryPool* pool) + : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), std::move(veloxPool), pool) {} + + arrow::Status init(); + + arrow::Status initFromRowVector(const facebook::velox::RowVector& rv); + + void setSortState(SortState state); + + arrow::Status doSort(facebook::velox::RowVectorPtr rv, int64_t memLimit); + + void stat() const; + + std::optional<facebook::velox::TypePtr> rowType_; + + std::unique_ptr<facebook::velox::VectorStreamGroup> batch_; + + // Partition ID -> Row Count + // subscript: Partition ID + // value: How many rows does this partition have in the current input RowVector + // Updated for each input RowVector. + std::vector<uint32_t> partition2RowCount_; + + std::unique_ptr<facebook::velox::serializer::presto::PrestoVectorSerde> serde_ = + std::make_unique<facebook::velox::serializer::presto::PrestoVectorSerde>(); + + std::vector<facebook::velox::RowVectorPtr> batches_; + + std::unordered_map<int32_t, std::vector<int64_t>> rowVectorIndexMap_; + + std::unordered_map<int32_t, std::vector<int64_t>> rowVectorPartitionMap_; + + uint32_t currentInputColumnBytes_ = 0; + + SortState sortState_{kSortInit}; +}; // class VeloxSortBasedShuffleWriter + +} // namespace gluten diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index ffda945b1..fdf3e4491 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -19,7 +19,9 @@ #include <arrow/io/api.h> #include "shuffle/LocalPartitionWriter.h" +#include "shuffle/VeloxHashBasedShuffleWriter.h" #include "shuffle/VeloxShuffleWriter.h" +#include "shuffle/VeloxSortBasedShuffleWriter.h" #include "shuffle/rss/RssPartitionWriter.h" #include "utils/TestUtils.h" #include "utils/VeloxArrowUtils.h" @@ -68,12 +70,18 @@ std::vector<ShuffleTestParams> createShuffleTestParams() { std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096}; for (const auto& compression : compressions) { + params.push_back(ShuffleTestParams{ShuffleWriterType::kSortShuffle, PartitionWriterType::kRss, compression, 0, 0}); for (const auto compressionThreshold : compressionThresholds) { for (const auto mergeBufferSize : mergeBufferSizes) { - params.push_back( - ShuffleTestParams{PartitionWriterType::kLocal, compression, compressionThreshold, mergeBufferSize}); + params.push_back(ShuffleTestParams{ + ShuffleWriterType::kHashShuffle, + PartitionWriterType::kLocal, + compression, + compressionThreshold, + mergeBufferSize}); } - params.push_back(ShuffleTestParams{PartitionWriterType::kRss, compression, compressionThreshold, 0}); + params.push_back(ShuffleTestParams{ + ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold, 0}); } } @@ -264,7 +272,9 @@ TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) { auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get()); // calculate maxBatchSize_ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, hashInputVector1_)); - VELOX_CHECK_EQ(shuffleWriter->maxBatchSize(), expectedMaxBatchSize); + if (GetParam().shuffleWriterType == kHashShuffle) { + VELOX_CHECK_EQ(shuffleWriter->maxBatchSize(), expectedMaxBatchSize); + } auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_}, {{1, 2, 3, 4, 8}, {0, 1}, {1, 2, 3, 4, 8}}); auto blockPid1 = takeRows({inputVector1_}, {{0, 5, 6, 7, 9, 0, 5, 6, 7, 9}}); @@ -305,6 +315,9 @@ TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) { } TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) { + if (GetParam().shuffleWriterType == kSortShuffle) { + return; + } ASSERT_NOT_OK(initShuffleWriterOptions()); shuffleWriterOptions_.bufferReallocThreshold = 0; // Force re-alloc on buffer size changed. auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get()); @@ -392,6 +405,9 @@ TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceReuse) { } TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) { + if (GetParam().shuffleWriterType == kSortShuffle) { + return; + } ASSERT_NOT_OK(initShuffleWriterOptions()); auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get()); diff --git a/cpp/velox/utils/tests/LocalRssClient.h b/cpp/velox/utils/tests/LocalRssClient.h index 0033526bb..c5c1b5d2c 100644 --- a/cpp/velox/utils/tests/LocalRssClient.h +++ b/cpp/velox/utils/tests/LocalRssClient.h @@ -30,7 +30,7 @@ class LocalRssClient : public RssClient { public: LocalRssClient(std::string dataFile) : dataFile_(dataFile) {} - int32_t pushPartitionData(int32_t partitionId, char* bytes, int64_t size) { + int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t size) { auto idx = -1; auto maybeIdx = partitionIdx_.find(partitionId); if (maybeIdx == partitionIdx_.end()) { diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index 972c0cb25..66732c97a 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -62,6 +62,7 @@ std::unique_ptr<PartitionWriter> createPartitionWriter( } // namespace struct ShuffleTestParams { + ShuffleWriterType shuffleWriterType; PartitionWriterType partitionWriterType; arrow::Compression::type compressionType; int32_t compressionThreshold; @@ -69,8 +70,9 @@ struct ShuffleTestParams { std::string toString() const { std::ostringstream out; - out << "partitionWriterType = " << partitionWriterType << ", compressionType = " << compressionType - << ", compressionThreshold = " << compressionThreshold << ", mergeBufferSize = " << mergeBufferSize; + out << "shuffleWriterType = " << shuffleWriterType << ", partitionWriterType = " << partitionWriterType + << ", compressionType = " << compressionType << ", compressionThreshold = " << compressionThreshold + << ", mergeBufferSize = " << mergeBufferSize; return out.str(); } }; @@ -179,7 +181,7 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) { std::shared_ptr<ColumnarBatch> cb = std::make_shared<VeloxColumnarBatch>(vector); - return shuffleWriter.split(cb, ShuffleWriter::kMinMemLimit); + return shuffleWriter.write(cb, ShuffleWriter::kMinMemLimit); } // Create multiple local dirs and join with comma. @@ -231,11 +233,47 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams ShuffleTestParams params = GetParam(); partitionWriterOptions_.compressionType = params.compressionType; + switch (partitionWriterOptions_.compressionType) { + case arrow::Compression::UNCOMPRESSED: + partitionWriterOptions_.compressionTypeStr = "none"; + break; + case arrow::Compression::LZ4_FRAME: + partitionWriterOptions_.compressionTypeStr = "lz4"; + break; + case arrow::Compression::ZSTD: + partitionWriterOptions_.compressionTypeStr = "zstd"; + break; + default: + break; + }; partitionWriterOptions_.compressionThreshold = params.compressionThreshold; partitionWriterOptions_.mergeBufferSize = params.mergeBufferSize; return arrow::Status::OK(); } + std::shared_ptr<VeloxShuffleWriter> createSpecificShuffleWriter( + arrow::MemoryPool* arrowPool, + std::unique_ptr<PartitionWriter> partitionWriter, + ShuffleWriterOptions shuffleWriterOptions, + uint32_t numPartitions, + int32_t bufferSize) { + std::shared_ptr<VeloxShuffleWriter> shuffleWriter; + if (GetParam().shuffleWriterType == kHashShuffle) { + shuffleWriterOptions.bufferSize = bufferSize; + GLUTEN_ASSIGN_OR_THROW( + shuffleWriter, + VeloxHashBasedShuffleWriter::create( + numPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions), pool_, arrowPool)); + } else if ( + GetParam().shuffleWriterType == kSortShuffle && GetParam().partitionWriterType == PartitionWriterType::kRss) { + GLUTEN_ASSIGN_OR_THROW( + shuffleWriter, + VeloxSortBasedShuffleWriter::create( + numPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions), pool_, arrowPool)); + } + return shuffleWriter; + } + protected: static void SetUpTestCase() { facebook::velox::memory::MemoryManager::testingSetInstance({}); @@ -276,9 +314,33 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams options.compressionType = compressionType; auto codec = createArrowIpcCodec(options.compressionType, CodecBackend::NONE); auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema)); + switch (options.compressionType) { + case arrow::Compression::type::UNCOMPRESSED: + options.compressionTypeStr = "none"; + break; + case arrow::Compression::type::LZ4_FRAME: + options.compressionTypeStr = "lz4"; + break; + case arrow::Compression::type::ZSTD: + options.compressionTypeStr = "zstd"; + break; + default: + break; + }; + auto veloxCompressionType = facebook::velox::common::stringToCompressionKind(options.compressionTypeStr); + if (!facebook::velox::isRegisteredVectorSerde()) { + facebook::velox::serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } // Set batchSize to a large value to make all batches are merged by reader. auto deserializerFactory = std::make_unique<gluten::VeloxColumnarBatchDeserializerFactory>( - schema, std::move(codec), rowType, std::numeric_limits<int32_t>::max(), defaultArrowMemoryPool().get(), pool_); + schema, + std::move(codec), + veloxCompressionType, + rowType, + std::numeric_limits<int32_t>::max(), + defaultArrowMemoryPool().get(), + pool_, + GetParam().shuffleWriterType); auto reader = std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory)); auto iter = reader->readStream(in); while (iter->hasNext()) { @@ -316,15 +378,12 @@ class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest { } std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* arrowPool) override { - shuffleWriterOptions_.bufferSize = 10; shuffleWriterOptions_.partitioning = Partitioning::kSingle; static const uint32_t kNumPartitions = 1; auto partitionWriter = createPartitionWriter( GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, partitionWriterOptions_, arrowPool); - GLUTEN_ASSIGN_OR_THROW( - auto shuffleWriter, - VeloxShuffleWriter::create( - kNumPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions_), pool_, arrowPool)); + std::shared_ptr<VeloxShuffleWriter> shuffleWriter = createSpecificShuffleWriter( + arrowPool, std::move(partitionWriter), std::move(shuffleWriterOptions_), kNumPartitions, 10); return shuffleWriter; } }; @@ -387,15 +446,12 @@ class HashPartitioningShuffleWriter : public MultiplePartitioningShuffleWriter { } std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* arrowPool) override { - shuffleWriterOptions_.bufferSize = 4; shuffleWriterOptions_.partitioning = Partitioning::kHash; static const uint32_t kNumPartitions = 2; auto partitionWriter = createPartitionWriter( GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, partitionWriterOptions_, arrowPool); - GLUTEN_ASSIGN_OR_THROW( - auto shuffleWriter, - VeloxShuffleWriter::create( - kNumPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions_), pool_, arrowPool)); + std::shared_ptr<VeloxShuffleWriter> shuffleWriter = createSpecificShuffleWriter( + arrowPool, std::move(partitionWriter), std::move(shuffleWriterOptions_), kNumPartitions, 4); return shuffleWriter; } @@ -422,15 +478,12 @@ class RangePartitioningShuffleWriter : public MultiplePartitioningShuffleWriter } std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* arrowPool) override { - shuffleWriterOptions_.bufferSize = 4; shuffleWriterOptions_.partitioning = Partitioning::kRange; static const uint32_t kNumPartitions = 2; auto partitionWriter = createPartitionWriter( GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, partitionWriterOptions_, arrowPool); - GLUTEN_ASSIGN_OR_THROW( - auto shuffleWriter, - VeloxShuffleWriter::create( - kNumPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions_), pool_, arrowPool)); + std::shared_ptr<VeloxShuffleWriter> shuffleWriter = createSpecificShuffleWriter( + arrowPool, std::move(partitionWriter), std::move(shuffleWriterOptions_), kNumPartitions, 4); return shuffleWriter; } @@ -441,7 +494,7 @@ class RangePartitioningShuffleWriter : public MultiplePartitioningShuffleWriter facebook::velox::TypePtr dataType, std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) { /* blockId = pid, rowVector in block */ for (auto& batch : batches) { - ASSERT_NOT_OK(shuffleWriter.split(batch, ShuffleWriter::kMinMemLimit)); + ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit)); } shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength, dataType, expectedVectors); } @@ -453,14 +506,11 @@ class RangePartitioningShuffleWriter : public MultiplePartitioningShuffleWriter class RoundRobinPartitioningShuffleWriter : public MultiplePartitioningShuffleWriter { protected: std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* arrowPool) override { - shuffleWriterOptions_.bufferSize = 4; static const uint32_t kNumPartitions = 2; auto partitionWriter = createPartitionWriter( GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, partitionWriterOptions_, arrowPool); - GLUTEN_ASSIGN_OR_THROW( - auto shuffleWriter, - VeloxShuffleWriter::create( - kNumPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions_), pool_, arrowPool)); + std::shared_ptr<VeloxShuffleWriter> shuffleWriter = createSpecificShuffleWriter( + arrowPool, std::move(partitionWriter), std::move(shuffleWriterOptions_), kNumPartitions, 4); return shuffleWriter; } }; @@ -479,7 +529,7 @@ class VeloxShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, public t PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_, partitionWriterOptions_, arrowPool); GLUTEN_ASSIGN_OR_THROW( auto shuffleWriter, - VeloxShuffleWriter::create( + VeloxHashBasedShuffleWriter::create( numPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions_), pool_, arrowPool)); return shuffleWriter; } diff --git a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java index a1a41f973..f454cf00c 100644 --- a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java +++ b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java @@ -24,7 +24,6 @@ import com.google.common.collect.Iterators; import org.apache.celeborn.client.LifecycleManager; import org.apache.celeborn.client.ShuffleClient; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.protocol.ShuffleMode; import org.apache.spark.*; import org.apache.spark.shuffle.*; import org.apache.spark.shuffle.celeborn.*; @@ -291,10 +290,6 @@ public class CelebornShuffleManager implements ShuffleManager { shuffleId = h.dependency().shuffleId(); } - if (!ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) { - throw new UnsupportedOperationException( - "Unrecognized shuffle write mode!" + celebornConf.shuffleWriterMode()); - } if (h.dependency() instanceof ColumnarShuffleDependency) { // columnar-based shuffle return writerFactory.createShuffleWriterInstance( diff --git a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala index 292ff3cc1..efd891498 100644 --- a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala @@ -29,6 +29,7 @@ import org.apache.celeborn.client.ShuffleClient import org.apache.celeborn.common.CelebornConf import java.io.IOException +import java.util.Locale abstract class CelebornHashBasedColumnarShuffleWriter[K, V]( shuffleId: Int, @@ -53,6 +54,13 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V]( protected val clientPushBufferMaxSize: Int = celebornConf.clientPushBufferMaxSize + protected val clientPushSortMemoryThreshold: Long = celebornConf.clientPushSortMemoryThreshold + + protected val clientSortMemoryMaxSize: Long = celebornConf.clientPushSortMemoryThreshold + + protected val shuffleWriterType: String = + celebornConf.shuffleWriterMode.name.toLowerCase(Locale.ROOT) + protected val celebornPartitionPusher = new CelebornPartitionPusher( shuffleId, numMappers, diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala index d72977f59..699626db1 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala @@ -39,6 +39,7 @@ import org.apache.celeborn.client.read.CelebornInputStream import java.io._ import java.nio.ByteBuffer +import java.util.Locale import java.util.UUID import java.util.concurrent.atomic.AtomicBoolean @@ -83,6 +84,8 @@ private class CelebornColumnarBatchSerializerInstance( } val compressionCodecBackend = GlutenConfig.getConf.columnarShuffleCodecBackend.orNull + val shuffleWriterType = + conf.get("spark.celeborn.client.spark.shuffle.writer", "hash").toLowerCase(Locale.ROOT) val jniWrapper = ShuffleReaderJniWrapper.create() val batchSize = GlutenConfig.getConf.maxBatchSize val handle = jniWrapper @@ -91,7 +94,8 @@ private class CelebornColumnarBatchSerializerInstance( nmm.getNativeInstanceHandle, compressionCodec, compressionCodecBackend, - batchSize + batchSize, + shuffleWriterType ) // Close shuffle reader instance as lately as the end of task processing, // since the native reader could hold a reference to memory pool that diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala index 75d5148cd..37ea11a73 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala @@ -69,6 +69,12 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V]( } } + private val memoryLimit: Long = if ("sort".equals(shuffleWriterType)) { + Math.min(clientSortMemoryMaxSize, clientPushBufferMaxSize * numPartitions) + } else { + availableOffHeapPerTask() + } + private def availableOffHeapPerTask(): Long = { val perTask = SparkMemoryUtil.getCurrentAvailableOffHeapMemory / SparkResourceUtil.getTaskSlots(conf) @@ -97,6 +103,7 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V]( bufferCompressThreshold, GlutenConfig.getConf.columnarShuffleCompressionMode, clientPushBufferMaxSize, + clientPushSortMemoryThreshold, celebornPartitionPusher, NativeMemoryManagers .create( @@ -127,11 +134,12 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V]( context.taskAttemptId(), GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId), "celeborn", + shuffleWriterType, GlutenConfig.getConf.columnarShuffleReallocThreshold ) } val startTime = System.nanoTime() - jniWrapper.split(nativeShuffleWriter, cb.numRows, handle, availableOffHeapPerTask()) + jniWrapper.write(nativeShuffleWriter, cb.numRows, handle, availableOffHeapPerTask()) dep.metrics("splitTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(cb.numRows) dep.metrics("inputBatches").add(1) diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java index 411907ae3..24425ccf7 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java @@ -41,7 +41,8 @@ public class ShuffleReaderJniWrapper implements RuntimeAware { long memoryManagerHandle, String compressionType, String compressionCodecBackend, - int batchSize); + int batchSize, + String shuffleWriterType); public native long readStream(long shuffleReaderHandle, JniByteInputStream jniIn); diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java index 243c90599..ed312fa14 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java @@ -90,8 +90,10 @@ public class ShuffleWriterJniWrapper implements RuntimeAware { taskAttemptId, startPartitionId, 0, + 0, null, - "local"); + "local", + "hash"); } /** @@ -110,12 +112,14 @@ public class ShuffleWriterJniWrapper implements RuntimeAware { int bufferCompressThreshold, String compressionMode, int pushBufferMaxSize, + long sortBufferMaxSize, Object pusher, long memoryManagerHandle, long handle, long taskAttemptId, int startPartitionId, String partitionWriterType, + String shuffleWriterType, double reallocThreshold) { return nativeMake( part.getShortName(), @@ -137,8 +141,10 @@ public class ShuffleWriterJniWrapper implements RuntimeAware { taskAttemptId, startPartitionId, pushBufferMaxSize, + sortBufferMaxSize, pusher, - partitionWriterType); + partitionWriterType, + shuffleWriterType); } public native long nativeMake( @@ -161,8 +167,10 @@ public class ShuffleWriterJniWrapper implements RuntimeAware { long taskAttemptId, int startPartitionId, int pushBufferMaxSize, + long sortBufferMaxSize, Object pusher, - String partitionWriterType); + String partitionWriterType, + String shuffleWriterType); /** * Evict partition data. @@ -187,7 +195,7 @@ public class ShuffleWriterJniWrapper implements RuntimeAware { * allocator instead * @return batch bytes. */ - public native long split(long shuffleWriterHandle, int numRows, long handler, long memLimit); + public native long write(long shuffleWriterHandle, int numRows, long handler, long memLimit); /** * Write the data remained in the buffers hold by native shuffle writer to each partition's diff --git a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala index e632700e3..69e9aa9c9 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala @@ -102,7 +102,9 @@ private class ColumnarBatchSerializerInstance( nmm.getNativeInstanceHandle, compressionCodec, compressionCodecBackend, - batchSize) + batchSize, + "hash" + ) // Close shuffle reader instance as lately as the end of task processing, // since the native reader could hold a reference to memory pool that // was used to create all buffers read from shuffle reader. The pool diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index fb933866c..c797257f1 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -181,7 +181,7 @@ class ColumnarShuffleWriter[K, V]( ) } val startTime = System.nanoTime() - jniWrapper.split(nativeShuffleWriter, rows, handle, availableOffHeapPerTask()) + jniWrapper.write(nativeShuffleWriter, rows, handle, availableOffHeapPerTask()) dep.metrics("splitTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(rows) dep.metrics("inputBatches").add(1) diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index 17cfce1c0..c0063c6f4 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -145,6 +145,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends RssShuffleWriter<K, compressThreshold, GlutenConfig.getConf().columnarShuffleCompressionMode(), bufferSize, + bufferSize, partitionPusher, NativeMemoryManagers.create( "UniffleShuffleWriter", @@ -180,12 +181,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends RssShuffleWriter<K, GlutenShuffleUtils.getStartPartitionId( columnarDep.nativePartitioning(), partitionId), "uniffle", + "hash", reallocThreshold); } long startTime = System.nanoTime(); long bytes = - jniWrapper.split(nativeShuffleWriter, cb.numRows(), handle, availableOffHeapPerTask()); - LOG.debug("jniWrapper.split rows {}, split bytes {}", cb.numRows(), bytes); + jniWrapper.write(nativeShuffleWriter, cb.numRows(), handle, availableOffHeapPerTask()); + LOG.debug("jniWrapper.write rows {}, split bytes {}", cb.numRows(), bytes); columnarDep.metrics().get("dataSize").get().add(bytes); // this metric replace part of uniffle shuffle write time columnarDep.metrics().get("splitTime").get().add(System.nanoTime() - startTime); diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index ca8a9dce1..02c6bf7fe 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -134,6 +134,11 @@ class GlutenConfig(conf: SQLConf) extends Logging { .getConfString("spark.shuffle.manager", "sort") .contains("UniffleShuffleManager") + def isSortBasedCelebornShuffle: Boolean = + conf + .getConfString("spark.celeborn.client.spark.shuffle.writer", "hash") + .equals("sort") + def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED) def enablePreferColumnar: Boolean = conf.getConf(COLUMNAR_PREFER_ENABLED) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org