This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1b8d42026f [GLUTEN-9163][VL] Use stream de/compressor in sort-based
shuffle (#9278)
1b8d42026f is described below
commit 1b8d42026fbe7c4512af459ef75039cdd09d3b0b
Author: Rong Ma <[email protected]>
AuthorDate: Thu Apr 17 17:12:28 2025 +0100
[GLUTEN-9163][VL] Use stream de/compressor in sort-based shuffle (#9278)
---
cpp/core/jni/JniWrapper.cc | 8 +-
cpp/core/shuffle/LocalPartitionWriter.cc | 210 +++++++----
cpp/core/shuffle/LocalPartitionWriter.h | 14 +-
cpp/core/shuffle/Options.h | 8 +-
cpp/core/shuffle/PartitionWriter.h | 10 +-
cpp/core/shuffle/Payload.cc | 24 +-
cpp/core/shuffle/Payload.h | 20 +-
cpp/core/shuffle/Spill.cc | 6 -
cpp/core/shuffle/Spill.h | 7 -
cpp/core/shuffle/Utils.h | 395 +++++++++++++++++++++
cpp/core/shuffle/rss/RssPartitionWriter.cc | 67 +++-
cpp/core/shuffle/rss/RssPartitionWriter.h | 97 ++++-
cpp/velox/benchmarks/GenericBenchmark.cc | 8 +
cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 10 +-
cpp/velox/shuffle/VeloxShuffleReader.cc | 125 +++----
cpp/velox/shuffle/VeloxShuffleReader.h | 18 +-
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 76 ++--
cpp/velox/shuffle/VeloxSortShuffleWriter.h | 8 +-
cpp/velox/tests/VeloxShuffleWriterTest.cc | 19 +-
cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 8 +-
20 files changed, 821 insertions(+), 317 deletions(-)
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 512adf9e62..c761920c87 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -825,10 +825,10 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jstring codecJstr,
jstring codecBackendJstr,
jint compressionLevel,
- jint sortEvictBufferSize,
+ jint diskWriteBufferSize,
jint compressionThreshold,
jstring compressionModeJstr,
- jint sortBufferInitialSize,
+ jint initialSortBufferSize,
jboolean useRadixSort,
jstring dataFileJstr,
jint numSubDirs,
@@ -856,8 +856,8 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
.taskAttemptId = static_cast<int64_t>(taskAttemptId),
.startPartitionId = startPartitionId,
.shuffleWriterType = ShuffleWriter::stringToType(jStringToCString(env,
shuffleWriterTypeJstr)),
- .sortBufferInitialSize = sortBufferInitialSize,
- .sortEvictBufferSize = sortEvictBufferSize,
+ .initialSortBufferSize = initialSortBufferSize,
+ .diskWriteBufferSize = diskWriteBufferSize,
.useRadixSort = static_cast<bool>(useRadixSort)};
// Build PartitionWriterOptions.
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 2cca7f4c69..d31cdde68a 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -15,33 +15,76 @@
* limitations under the License.
*/
-#include <filesystem>
-#include <random>
-#include <thread>
-
-#include <boost/stacktrace.hpp>
-#include <glog/logging.h>
#include "shuffle/LocalPartitionWriter.h"
+
#include "shuffle/Payload.h"
#include "shuffle/Spill.h"
#include "shuffle/Utils.h"
+#include "utils/Timer.h"
-namespace gluten {
+#include <fcntl.h>
+#include <glog/logging.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <filesystem>
+#include <random>
+#include <thread>
+namespace gluten {
class LocalPartitionWriter::LocalSpiller {
public:
LocalSpiller(
+ bool isFinal,
std::shared_ptr<arrow::io::OutputStream> os,
std::string spillFile,
uint32_t compressionThreshold,
arrow::MemoryPool* pool,
arrow::util::Codec* codec)
- : os_(os),
+ : isFinal_(isFinal),
+ os_(os),
spillFile_(std::move(spillFile)),
compressionThreshold_(compressionThreshold),
pool_(pool),
codec_(codec),
-
diskSpill_(std::make_unique<Spill>(Spill::SpillType::kSequentialSpill)) {}
+ diskSpill_(std::make_unique<Spill>()) {
+ if (codec_ != nullptr) {
+ GLUTEN_ASSIGN_OR_THROW(
+ compressedOs_, ShuffleCompressedOutputStream::Make(codec_, os,
arrow::default_memory_pool()));
+ }
+ }
+
+ arrow::Status flush() {
+ if (flushed_) {
+ return arrow::Status::OK();
+ }
+ flushed_ = true;
+
+ if (compressedOs_ != nullptr) {
+ RETURN_NOT_OK(compressedOs_->Flush());
+ }
+ ARROW_ASSIGN_OR_RAISE(const auto pos, os_->Tell());
+
+ diskSpill_->insertPayload(lastPid_, Payload::kRaw, 0, nullptr, pos -
writePos_, pool_, nullptr);
+ DLOG(INFO) << "LocalSpiller: Spilled partition " << lastPid_ << " file
start: " << writePos_
+ << ", file end: " << pos << ", file: " << spillFile_;
+ return arrow::Status::OK();
+ }
+
+ arrow::Status spill(uint32_t partitionId, std::unique_ptr<InMemoryPayload>
payload) {
+ ScopedTimer timer(&spillTime_);
+
+ if (lastPid_ != partitionId) {
+ // Record the write position of the new partition.
+ ARROW_ASSIGN_OR_RAISE(writePos_, os_->Tell());
+ lastPid_ = partitionId;
+ }
+
+ flushed_ = false;
+ auto* raw = compressedOs_ != nullptr ? compressedOs_.get() : os_.get();
+ RETURN_NOT_OK(payload->serialize(raw));
+
+ return arrow::Status::OK();
+ }
arrow::Status spill(uint32_t partitionId, std::unique_ptr<BlockPayload>
payload) {
// Check spill Type.
@@ -55,10 +98,6 @@ class LocalPartitionWriter::LocalSpiller {
ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell());
DLOG(INFO) << "LocalSpiller: Spilled partition " << partitionId << " file
start: " << start << ", file end: " << end
<< ", file: " << spillFile_;
- if (payload->type() == Payload::kRaw) {
- diskSpill_->insertPayload(partitionId, Payload::kRaw, 0, nullptr, end -
start, pool_, nullptr);
- return arrow::Status::OK();
- }
auto payloadType = payload->type();
if (payloadType == Payload::kUncompressed && codec_ != nullptr &&
payload->numRows() >= compressionThreshold_) {
@@ -69,17 +108,34 @@ class LocalPartitionWriter::LocalSpiller {
return arrow::Status::OK();
}
- arrow::Result<std::shared_ptr<Spill>> finish(bool close) {
+ arrow::Result<std::shared_ptr<Spill>> finish() {
ARROW_RETURN_IF(finished_, arrow::Status::Invalid("Calling finish() on a
finished LocalSpiller."));
ARROW_RETURN_IF(os_->closed(), arrow::Status::Invalid("Spill file os has
been closed."));
- finished_ = true;
- if (close) {
+ if (lastPid_ != -1) {
+ if (compressedOs_ != nullptr) {
+ compressTime_ = compressedOs_->compressTime();
+ spillTime_ -= compressTime_;
+ RETURN_NOT_OK(compressedOs_->Close());
+ }
+
+ if (!isFinal_) {
+ ARROW_ASSIGN_OR_RAISE(auto pos, os_->Tell());
+ diskSpill_->insertPayload(lastPid_, Payload::kRaw, 0, nullptr, pos -
writePos_, pool_, nullptr);
+ DLOG(INFO) << "LocalSpiller: Spilled partition " << lastPid_ << " file
start: " << writePos_
+ << ", file end: " << pos << ", file: " << spillFile_;
+ }
+ }
+
+ if (!isFinal_) {
RETURN_NOT_OK(os_->Close());
}
+
diskSpill_->setSpillFile(spillFile_);
diskSpill_->setSpillTime(spillTime_);
diskSpill_->setCompressTime(compressTime_);
+ finished_ = true;
+
return std::move(diskSpill_);
}
@@ -88,28 +144,31 @@ class LocalPartitionWriter::LocalSpiller {
}
private:
+ bool isFinal_;
+
std::shared_ptr<arrow::io::OutputStream> os_;
+ std::shared_ptr<ShuffleCompressedOutputStream> compressedOs_{nullptr};
+ int64_t writePos_{0};
+
std::string spillFile_;
uint32_t compressionThreshold_;
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
- bool finished_{false};
std::shared_ptr<Spill> diskSpill_{nullptr};
+
+ bool flushed_{true};
+ bool finished_{false};
int64_t spillTime_{0};
int64_t compressTime_{0};
+ int32_t lastPid_{-1};
};
class LocalPartitionWriter::PayloadMerger {
public:
- PayloadMerger(
- const PartitionWriterOptions& options,
- arrow::MemoryPool* pool,
- arrow::util::Codec* codec,
- bool hasComplexType)
+ PayloadMerger(const PartitionWriterOptions& options, arrow::MemoryPool*
pool, arrow::util::Codec* codec)
: pool_(pool),
codec_(codec),
- hasComplexType_(hasComplexType),
compressionThreshold_(options.compressionThreshold),
mergeBufferSize_(options.mergeBufferSize),
mergeBufferMinSize_(options.mergeBufferSize * options.mergeThreshold)
{}
@@ -117,7 +176,7 @@ class LocalPartitionWriter::PayloadMerger {
arrow::Result<std::vector<std::unique_ptr<BlockPayload>>>
merge(uint32_t partitionId, std::unique_ptr<InMemoryPayload> append, bool
reuseBuffers) {
std::vector<std::unique_ptr<BlockPayload>> merged{};
- if (hasComplexType_) {
+ if (!append->mergeable()) {
// TODO: Merging complex type is currently not supported.
merged.emplace_back();
ARROW_ASSIGN_OR_RAISE(merged.back(),
createBlockPayload(std::move(append), reuseBuffers));
@@ -215,7 +274,6 @@ class LocalPartitionWriter::PayloadMerger {
private:
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
- bool hasComplexType_;
int32_t compressionThreshold_;
int32_t mergeBufferSize_;
int32_t mergeBufferMinSize_;
@@ -332,7 +390,7 @@ class LocalPartitionWriter::PayloadCache {
spillTime_ += payload->getWriteTime();
if (UNLIKELY(!diskSpill)) {
- diskSpill =
std::make_unique<Spill>(Spill::SpillType::kBatchedSpill);
+ diskSpill = std::make_unique<Spill>();
}
ARROW_ASSIGN_OR_RAISE(auto end, os->Tell());
DLOG(INFO) << "PayloadCache: Spilled partition " << pid << " file
start: " << start << ", file end: " << end
@@ -419,25 +477,34 @@ void LocalPartitionWriter::init() {
subDirSelection_.assign(localDirs_.size(), 0);
}
-arrow::Status LocalPartitionWriter::mergeSpills(uint32_t partitionId) {
- auto spillId = 0;
- auto spillIter = spills_.begin();
- while (spillIter != spills_.end()) {
- ARROW_ASSIGN_OR_RAISE(auto st, dataFileOs_->Tell());
- (*spillIter)->openForRead(options_.shuffleFileBufferSize);
- // Read if partition exists in the spilled file and write to the final
file.
- while (auto payload = (*spillIter)->nextPayload(partitionId)) {
+arrow::Result<int64_t> LocalPartitionWriter::mergeSpills(uint32_t partitionId)
{
+ int64_t bytesEvicted = 0;
+ int32_t spillIndex = 0;
+
+ for (const auto& spill : spills_) {
+ ARROW_ASSIGN_OR_RAISE(auto startPos, dataFileOs_->Tell());
+
+ spill->openForRead(options_.shuffleFileBufferSize);
+
+ // Read if partition exists in the spilled file. Then write to the final
data file.
+ while (auto payload = spill->nextPayload(partitionId)) {
// May trigger spill during compression.
RETURN_NOT_OK(payload->serialize(dataFileOs_.get()));
compressTime_ += payload->getCompressTime();
writeTime_ += payload->getWriteTime();
}
- ++spillIter;
- ARROW_ASSIGN_OR_RAISE(auto ed, dataFileOs_->Tell());
- DLOG(INFO) << "Partition " << partitionId << " spilled from spillResult "
<< spillId++ << " of bytes " << ed - st;
- totalBytesEvicted_ += (ed - st);
+
+ ARROW_ASSIGN_OR_RAISE(auto endPos, dataFileOs_->Tell());
+ auto bytesWritten = endPos - startPos;
+
+ DLOG(INFO) << "Partition " << partitionId << " spilled from spillResult "
<< spillIndex++ << " of bytes "
+ << bytesWritten;
+
+ bytesEvicted += bytesWritten;
}
- return arrow::Status::OK();
+
+ totalBytesEvicted_ += bytesEvicted;
+ return bytesEvicted;
}
arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
@@ -447,17 +514,13 @@ arrow::Status
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
stopped_ = true;
if (useSpillFileAsDataFile_) {
- RETURN_NOT_OK(finishSpill(false));
- // The last spill has been written to data file.
- auto spill = std::move(spills_.back());
- spills_.pop_back();
+ RETURN_NOT_OK(spiller_->flush());
+ ARROW_ASSIGN_OR_RAISE(auto spill, spiller_->finish());
// Merge the remaining partitions from spills.
- if (spills_.size() > 0) {
+ if (!spills_.empty()) {
for (auto pid = lastEvictPid_ + 1; pid < numPartitions_; ++pid) {
- auto bytesEvicted = totalBytesEvicted_;
- RETURN_NOT_OK(mergeSpills(pid));
- partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
+ ARROW_ASSIGN_OR_RAISE(partitionLengths_[pid], mergeSpills(pid));
}
}
@@ -469,7 +532,7 @@ arrow::Status
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
writeTime_ = spill->spillTime();
compressTime_ += spill->compressTime();
} else {
- RETURN_NOT_OK(finishSpill(true));
+ RETURN_NOT_OK(finishSpill());
ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
int64_t endInFinalFile = 0;
@@ -527,6 +590,7 @@ arrow::Status LocalPartitionWriter::requestSpill(bool
isFinal) {
std::string spillFile;
std::shared_ptr<arrow::io::OutputStream> os;
if (isFinal) {
+ // If `spill()` is requested after `stop()`, open the final data file
for writing.
ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
spillFile = dataFile_;
os = dataFileOs_;
@@ -536,17 +600,16 @@ arrow::Status LocalPartitionWriter::requestSpill(bool
isFinal) {
ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile));
}
spiller_ = std::make_unique<LocalSpiller>(
- os, std::move(spillFile), options_.compressionThreshold,
payloadPool_.get(), codec_.get());
+ isFinal, os, std::move(spillFile), options_.compressionThreshold,
payloadPool_.get(), codec_.get());
}
return arrow::Status::OK();
}
-arrow::Status LocalPartitionWriter::finishSpill(bool close) {
- // Finish the spiller. No compression, no spill.
+arrow::Status LocalPartitionWriter::finishSpill() {
if (spiller_ && !spiller_->finished()) {
auto spiller = std::move(spiller_);
spills_.emplace_back();
- ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish(close));
+ ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish());
}
return arrow::Status::OK();
}
@@ -555,8 +618,7 @@ arrow::Status LocalPartitionWriter::hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
- bool reuseBuffers,
- bool hasComplexType) {
+ bool reuseBuffers) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
if (evictType == Evict::kSpill) {
@@ -568,8 +630,7 @@ arrow::Status LocalPartitionWriter::hashEvict(
}
if (!merger_) {
- merger_ =
- std::make_shared<PayloadMerger>(options_, payloadPool_.get(), codec_ ?
codec_.get() : nullptr, hasComplexType);
+ merger_ = std::make_shared<PayloadMerger>(options_, payloadPool_.get(),
codec_ ? codec_.get() : nullptr);
}
ARROW_ASSIGN_OR_RAISE(auto merged, merger_->merge(partitionId,
std::move(inMemoryPayload), reuseBuffers));
if (!merged.empty()) {
@@ -584,43 +645,38 @@ arrow::Status LocalPartitionWriter::hashEvict(
return arrow::Status::OK();
}
-arrow::Status LocalPartitionWriter::sortEvict(
- uint32_t partitionId,
- std::unique_ptr<InMemoryPayload> inMemoryPayload,
- std::shared_ptr<arrow::Buffer> compressed,
- bool isFinal) {
+arrow::Status
+LocalPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal &&
!dataFileOs_))) {
lastEvictPid_ = -1;
- RETURN_NOT_OK(finishSpill(true));
+ RETURN_NOT_OK(finishSpill());
}
RETURN_NOT_OK(requestSpill(isFinal));
- auto payloadType = codec_ ? Payload::Type::kCompressed :
Payload::Type::kUncompressed;
- ARROW_ASSIGN_OR_RAISE(
- auto payload,
- inMemoryPayload->toBlockPayload(
- payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr,
std::move(compressed)));
- if (!isFinal) {
- RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
- } else {
- if (spills_.size() > 0) {
+ if (lastEvictPid_ != partitionId) {
+ // Flush the remaining data for lastEvictPid_.
+ RETURN_NOT_OK(spiller_->flush());
+
+ // For final data file, merge all spills for partitions in [lastEvictPid_
+ 1, partitionId]. Note in this function,
+ // only the spilled partitions before partitionId are merged. Therefore,
the remaining partitions after partitionId
+ // are not merged here and will be merged in `stop()`.
+ if (isFinal && !spills_.empty()) {
for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
- auto bytesEvicted = totalBytesEvicted_;
- RETURN_NOT_OK(mergeSpills(pid));
- partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
+ ARROW_ASSIGN_OR_RAISE(partitionLengths_[pid], mergeSpills(pid));
}
}
- RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
}
+
+ RETURN_NOT_OK(spiller_->spill(partitionId, std::move(inMemoryPayload)));
lastEvictPid_ = partitionId;
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t*
actual) {
// Finish last spiller.
- RETURN_NOT_OK(finishSpill(true));
+ RETURN_NOT_OK(finishSpill());
int64_t reclaimed = 0;
// Reclaim memory from payloadCache.
@@ -651,7 +707,7 @@ arrow::Status
LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
// This is not accurate. When the evicted partition buffers are not
copied, the merged ones
// are resized from the original buffers thus allocated from
partitionBufferPool.
reclaimed += beforeSpill - payloadPool_->bytes_allocated();
- RETURN_NOT_OK(finishSpill(true));
+ RETURN_NOT_OK(finishSpill());
}
*actual = reclaimed;
return arrow::Status::OK();
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h
b/cpp/core/shuffle/LocalPartitionWriter.h
index de1bce8e74..02cc16c565 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -38,14 +38,10 @@ class LocalPartitionWriter : public PartitionWriter {
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
- bool reuseBuffers,
- bool hasComplexType) override;
+ bool reuseBuffers) override;
- arrow::Status sortEvict(
- uint32_t partitionId,
- std::unique_ptr<InMemoryPayload> inMemoryPayload,
- std::shared_ptr<arrow::Buffer> compressed,
- bool isFinal) override;
+ arrow::Status sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal)
+ override;
// This code path is not used by LocalPartitionWriter, Not implement it by
default.
arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload>
blockPayload, bool stop) override {
@@ -90,13 +86,13 @@ class LocalPartitionWriter : public PartitionWriter {
arrow::Status requestSpill(bool isFinal);
- arrow::Status finishSpill(bool close);
+ arrow::Status finishSpill();
std::string nextSpilledFileDir();
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> openFile(const
std::string& file);
- arrow::Status mergeSpills(uint32_t partitionId);
+ arrow::Result<int64_t> mergeSpills(uint32_t partitionId);
arrow::Status clearResource();
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 5e98f82912..07dd2999b0 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -30,7 +30,7 @@ static constexpr int64_t kDefaultSortBufferThreshold = 64 <<
20;
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
-static constexpr int32_t kDefaultSortEvictBufferSize = 32 * 1024;
+static constexpr int32_t kDefaultDiskWriteBufferSize = 32 * 1024; // TODO:
compare performance with 1M (spark default)
static const std::string kDefaultCompressionTypeStr = "lz4";
static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
@@ -65,9 +65,9 @@ struct ShuffleWriterOptions {
ShuffleWriterType shuffleWriterType = kHashShuffle;
// Sort shuffle writer.
- int32_t sortBufferInitialSize = kDefaultSortBufferSize;
- int32_t sortEvictBufferSize = kDefaultSortEvictBufferSize;
- bool useRadixSort = kDefaultUseRadixSort;
+ int32_t initialSortBufferSize = kDefaultSortBufferSize; //
spark.shuffle.sort.initialBufferSize
+ int32_t diskWriteBufferSize = kDefaultDiskWriteBufferSize; //
spark.shuffle.spill.diskWriteBufferSize
+ bool useRadixSort = kDefaultUseRadixSort; // spark.shuffle.sort.useRadixSort
};
struct PartitionWriterOptions {
diff --git a/cpp/core/shuffle/PartitionWriter.h
b/cpp/core/shuffle/PartitionWriter.h
index 3a44d38365..171efed0a3 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -46,14 +46,10 @@ class PartitionWriter : public Reclaimable {
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
- bool reuseBuffers,
- bool hasComplexType) = 0;
+ bool reuseBuffers) = 0;
- virtual arrow::Status sortEvict(
- uint32_t partitionId,
- std::unique_ptr<InMemoryPayload> inMemoryPayload,
- std::shared_ptr<arrow::Buffer> compressed,
- bool isFinal) = 0;
+ virtual arrow::Status
+ sortEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload>
inMemoryPayload, bool isFinal) = 0;
std::optional<int64_t> getCompressedBufferLength(const
std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
if (!codec_) {
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index ddf4a40966..602b49b39e 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -354,6 +354,7 @@ arrow::Result<std::unique_ptr<InMemoryPayload>>
InMemoryPayload::merge(
std::unique_ptr<InMemoryPayload> source,
std::unique_ptr<InMemoryPayload> append,
arrow::MemoryPool* pool) {
+ GLUTEN_DCHECK(source->mergeable() && append->mergeable(), "Cannot merge
payloads.");
auto mergedRows = source->numRows() + append->numRows();
auto isValidityBuffer = source->isValidityBuffer();
@@ -435,10 +436,19 @@ arrow::Result<std::unique_ptr<BlockPayload>>
InMemoryPayload::toBlockPayload(
}
arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream*
outputStream) {
- return arrow::Status::Invalid("Cannot serialize InMemoryPayload.");
+ for (auto& buffer : buffers_) {
+ RETURN_NOT_OK(outputStream->Write(buffer->data(), buffer->size()));
+ buffer = nullptr;
+ }
+ buffers_.clear();
+ return arrow::Status::OK();
}
arrow::Result<std::shared_ptr<arrow::Buffer>>
InMemoryPayload::readBufferAt(uint32_t index) {
+ GLUTEN_CHECK(
+ index < buffers_.size(),
+ "buffer index out of range: index = " + std::to_string(index) +
+ " vs buffer size = " + std::to_string(buffers_.size()));
return std::move(buffers_[index]);
}
@@ -462,6 +472,10 @@ int64_t InMemoryPayload::rawSize() {
return getBufferSize(buffers_);
}
+bool InMemoryPayload::mergeable() const {
+ return !hasComplexType_;
+}
+
UncompressedDiskBlockPayload::UncompressedDiskBlockPayload(
Type type,
uint32_t numRows,
@@ -476,10 +490,6 @@ UncompressedDiskBlockPayload::UncompressedDiskBlockPayload(
pool_(pool),
codec_(codec) {}
-arrow::Result<std::shared_ptr<arrow::Buffer>>
UncompressedDiskBlockPayload::readBufferAt(uint32_t index) {
- return arrow::Status::Invalid("Cannot read buffer from
UncompressedDiskBlockPayload.");
-}
-
arrow::Status UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream*
outputStream) {
ARROW_RETURN_IF(
inputStream_ == nullptr, arrow::Status::Invalid("inputStream_ is
uninitialized before calling serialize()."));
@@ -556,10 +566,6 @@ arrow::Status
CompressedDiskBlockPayload::serialize(arrow::io::OutputStream* out
return arrow::Status::OK();
}
-arrow::Result<std::shared_ptr<arrow::Buffer>>
CompressedDiskBlockPayload::readBufferAt(uint32_t index) {
- return arrow::Status::Invalid("Cannot read buffer from
CompressedDiskBlockPayload.");
-}
-
int64_t CompressedDiskBlockPayload::rawSize() {
return rawSize_;
}
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index 611b2310d5..efc70b3fbe 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -36,8 +36,6 @@ class Payload {
virtual arrow::Status serialize(arrow::io::OutputStream* outputStream) = 0;
- virtual arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t
index) = 0;
-
virtual int64_t rawSize() = 0;
int64_t getCompressTime() const {
@@ -101,7 +99,7 @@ class BlockPayload final : public Payload {
arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
- arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t pos)
override;
+ arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t pos);
int64_t rawSize() override;
@@ -127,15 +125,18 @@ class InMemoryPayload final : public Payload {
InMemoryPayload(
uint32_t numRows,
const std::vector<bool>* isValidityBuffer,
- std::vector<std::shared_ptr<arrow::Buffer>> buffers)
- : Payload(Type::kUncompressed, numRows, isValidityBuffer),
buffers_(std::move(buffers)) {}
+ std::vector<std::shared_ptr<arrow::Buffer>> buffers,
+ bool hasComplexType = false)
+ : Payload(Type::kUncompressed, numRows, isValidityBuffer),
+ buffers_(std::move(buffers)),
+ hasComplexType_(hasComplexType) {}
static arrow::Result<std::unique_ptr<InMemoryPayload>>
merge(std::unique_ptr<InMemoryPayload> source,
std::unique_ptr<InMemoryPayload> append, arrow::MemoryPool* pool);
arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
- arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index)
override;
+ arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index);
arrow::Result<std::unique_ptr<BlockPayload>> toBlockPayload(
Payload::Type payloadType,
@@ -147,8 +148,11 @@ class InMemoryPayload final : public Payload {
int64_t rawSize() override;
+ bool mergeable() const;
+
private:
std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
+ bool hasComplexType_;
};
class UncompressedDiskBlockPayload final : public Payload {
@@ -162,8 +166,6 @@ class UncompressedDiskBlockPayload final : public Payload {
arrow::MemoryPool* pool,
arrow::util::Codec* codec);
- arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index)
override;
-
arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
int64_t rawSize() override;
@@ -189,8 +191,6 @@ class CompressedDiskBlockPayload final : public Payload {
arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
- arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index)
override;
-
int64_t rawSize() override;
private:
diff --git a/cpp/core/shuffle/Spill.cc b/cpp/core/shuffle/Spill.cc
index 9b60d4bc3e..1a88a0b11f 100644
--- a/cpp/core/shuffle/Spill.cc
+++ b/cpp/core/shuffle/Spill.cc
@@ -21,8 +21,6 @@
namespace gluten {
-Spill::Spill(Spill::SpillType type) : type_(type) {}
-
Spill::~Spill() {
if (is_) {
static_cast<void>(is_->Close());
@@ -77,10 +75,6 @@ void Spill::openForRead(uint64_t shuffleFileBufferSize) {
}
}
-Spill::SpillType Spill::type() const {
- return type_;
-}
-
void Spill::setSpillFile(const std::string& spillFile) {
spillFile_ = spillFile;
}
diff --git a/cpp/core/shuffle/Spill.h b/cpp/core/shuffle/Spill.h
index fd692537c5..1bf55152ad 100644
--- a/cpp/core/shuffle/Spill.h
+++ b/cpp/core/shuffle/Spill.h
@@ -29,14 +29,8 @@ namespace gluten {
class Spill final {
public:
- enum SpillType { kSequentialSpill, kBatchedSpill };
-
- Spill(SpillType type);
-
~Spill();
- SpillType type() const;
-
void openForRead(uint64_t shuffleFileBufferSize);
bool hasNextPayload(uint32_t partitionId);
@@ -70,7 +64,6 @@ class Spill final {
std::unique_ptr<Payload> payload{};
};
- SpillType type_;
std::shared_ptr<gluten::MmapFileStream> is_;
std::list<PartitionPayload> partitionPayloads_{};
std::string spillFile_;
diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h
index 2e5ff58b6e..cfe07e5f2c 100644
--- a/cpp/core/shuffle/Utils.h
+++ b/cpp/core/shuffle/Utils.h
@@ -27,6 +27,9 @@
#include "utils/Compression.h"
+#include <utils/Exception.h>
+#include <utils/Timer.h>
+
namespace gluten {
using BinaryArrayLengthBufferType = uint32_t;
@@ -105,4 +108,396 @@ class MmapFileStream : public arrow::io::InputStream {
int64_t posRetain_ = 0;
};
+// Adopted from arrow::io::CompressedOutputStream. Rebuild compressor after
each `Flush()`.
+class ShuffleCompressedOutputStream : public arrow::io::OutputStream {
+ public:
+ /// \brief Create a compressed output stream wrapping the given output
stream.
+ static arrow::Result<std::shared_ptr<ShuffleCompressedOutputStream>>
+ Make(arrow::util::Codec* codec, const std::shared_ptr<OutputStream>& raw,
arrow::MemoryPool* pool) {
+ auto res = std::shared_ptr<ShuffleCompressedOutputStream>(new
ShuffleCompressedOutputStream(codec, raw, pool));
+ RETURN_NOT_OK(res->Init(codec));
+ return res;
+ }
+
+ arrow::Result<int64_t> Tell() const override {
+ return totalPos_;
+ }
+
+ arrow::Status Write(const void* data, int64_t nbytes) override {
+ ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+ if (nbytes == 0) {
+ return arrow::Status::OK();
+ }
+
+ freshCompressor_ = false;
+
+ int64_t flushTime = 0;
+ {
+ ScopedTimer timer(&compressTime_);
+ auto input = static_cast<const uint8_t*>(data);
+ while (nbytes > 0) {
+ int64_t input_len = nbytes;
+ int64_t output_len = compressed_->size() - compressedPos_;
+ uint8_t* output = compressed_->mutable_data() + compressedPos_;
+ ARROW_ASSIGN_OR_RAISE(auto result, compressor_->Compress(input_len,
input, output_len, output));
+ compressedPos_ += result.bytes_written;
+
+ if (result.bytes_read == 0) {
+ // Not enough output, try to flush it and retry
+ if (compressedPos_ > 0) {
+ RETURN_NOT_OK(FlushCompressed(flushTime));
+ output_len = compressed_->size() - compressedPos_;
+ output = compressed_->mutable_data() + compressedPos_;
+ ARROW_ASSIGN_OR_RAISE(result, compressor_->Compress(input_len,
input, output_len, output));
+ compressedPos_ += result.bytes_written;
+ }
+ }
+ input += result.bytes_read;
+ nbytes -= result.bytes_read;
+ totalPos_ += result.bytes_read;
+ if (compressedPos_ == compressed_->size()) {
+ // Output buffer full, flush it
+ RETURN_NOT_OK(FlushCompressed(flushTime));
+ }
+ if (result.bytes_read == 0) {
+ // Need to enlarge output buffer
+ RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+ }
+ }
+ }
+ compressTime_ -= flushTime;
+ flushTime_ += flushTime;
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Flush() override {
+ ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+ if (freshCompressor_) {
+ // No data written, no need to flush
+ return arrow::Status::OK();
+ }
+
+ RETURN_NOT_OK(FinalizeCompression());
+ ARROW_ASSIGN_OR_RAISE(compressor_, codec_->MakeCompressor());
+ freshCompressor_ = true;
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Close() override {
+ if (isOpen_) {
+ isOpen_ = false;
+ if (!freshCompressor_) {
+ RETURN_NOT_OK(FinalizeCompression());
+ }
+ // Do not close the underlying stream, it is the caller's responsibility.
+ }
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Abort() override {
+ if (isOpen_) {
+ isOpen_ = false;
+ return raw_->Abort();
+ }
+ return arrow::Status::OK();
+ }
+
+ bool closed() const override {
+ return !isOpen_;
+ }
+
+ int64_t compressTime() const {
+ return compressTime_;
+ }
+
+ int64_t flushTime() const {
+ return flushTime_;
+ }
+
+ private:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(ShuffleCompressedOutputStream);
+
+ ShuffleCompressedOutputStream(
+ arrow::util::Codec* codec,
+ const std::shared_ptr<OutputStream>& raw,
+ arrow::MemoryPool* pool)
+ : codec_(codec), raw_(raw), pool_(pool) {}
+
+ arrow::Status Init(arrow::util::Codec* codec) {
+ ARROW_ASSIGN_OR_RAISE(compressor_, codec->MakeCompressor());
+ ARROW_ASSIGN_OR_RAISE(compressed_, AllocateResizableBuffer(kChunkSize,
pool_));
+ compressedPos_ = 0;
+ isOpen_ = true;
+ return arrow::Status::OK();
+ }
+
+ arrow::Status FlushCompressed(int64_t& flushTime) {
+ if (compressedPos_ > 0) {
+ ScopedTimer timer(&flushTime);
+ RETURN_NOT_OK(raw_->Write(compressed_->data(), compressedPos_));
+ compressedPos_ = 0;
+ }
+ return arrow::Status::OK();
+ }
+
+ arrow::Status FinalizeCompression() {
+ int64_t flushTime = 0;
+ {
+ ScopedTimer timer(&compressTime_);
+ while (true) {
+ // Try to end compressor
+ int64_t output_len = compressed_->size() - compressedPos_;
+ uint8_t* output = compressed_->mutable_data() + compressedPos_;
+ ARROW_ASSIGN_OR_RAISE(auto result, compressor_->End(output_len,
output));
+ compressedPos_ += result.bytes_written;
+
+ // Flush compressed output
+ RETURN_NOT_OK(FlushCompressed(flushTime));
+
+ if (result.should_retry) {
+ // Need to enlarge output buffer
+ RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+ } else {
+ // Done
+ break;
+ }
+ }
+ }
+ compressTime_ -= flushTime;
+ flushTime_ += flushTime;
+ return arrow::Status::OK();
+ }
+
+ // TODO: Support setting chunk size
+ // Write 64 KB compressed data at a time
+ static const int64_t kChunkSize = 64 * 1024;
+
+ arrow::util::Codec* codec_;
+ std::shared_ptr<OutputStream> raw_;
+ arrow::MemoryPool* pool_;
+
+ bool freshCompressor_{true};
+ std::shared_ptr<arrow::util::Compressor> compressor_;
+ std::shared_ptr<arrow::ResizableBuffer> compressed_;
+
+ bool isOpen_{false};
+ int64_t compressedPos_{0};
+ // Total number of bytes compressed
+ int64_t totalPos_{0};
+
+ // Time spent on compressing data. Flushing the compressed data into raw_
stream is not included.
+ int64_t compressTime_{0};
+ int64_t flushTime_{0};
+};
+
+class CompressedInputStream : public arrow::io::InputStream {
+ public:
+ static arrow::Result<std::shared_ptr<CompressedInputStream>>
+ Make(arrow::util::Codec* codec, const std::shared_ptr<InputStream>& raw,
arrow::MemoryPool* pool) {
+ std::shared_ptr<CompressedInputStream> res(new CompressedInputStream(raw,
pool));
+ RETURN_NOT_OK(res->Init(codec));
+ return res;
+ }
+
+ arrow::Status Init(arrow::util::Codec* codec) {
+ ARROW_ASSIGN_OR_RAISE(decompressor_, codec->MakeDecompressor());
+ fresh_decompressor_ = true;
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Close() override {
+ if (is_open_) {
+ is_open_ = false;
+ return raw_->Close();
+ } else {
+ return arrow::Status::OK();
+ }
+ }
+
+ arrow::Status Abort() override {
+ if (is_open_) {
+ is_open_ = false;
+ return raw_->Abort();
+ } else {
+ return arrow::Status::OK();
+ }
+ }
+
+ bool closed() const override {
+ return !is_open_;
+ }
+
+ arrow::Result<int64_t> Tell() const override {
+ return total_pos_;
+ }
+
+ arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+ ScopedTimer timer(&decompressWallTime_);
+ auto out_data = reinterpret_cast<uint8_t*>(out);
+
+ int64_t total_read = 0;
+ bool decompressor_has_data = true;
+
+ while (nbytes - total_read > 0 && decompressor_has_data) {
+ total_read += ReadFromDecompressed(nbytes - total_read, out_data +
total_read);
+
+ if (nbytes == total_read) {
+ break;
+ }
+
+ // At this point, no more decompressed data remains, so we need to
+ // decompress more
+ RETURN_NOT_OK(RefillDecompressed(&decompressor_has_data));
+ }
+
+ total_pos_ += total_read;
+ return total_read;
+ }
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
+ ARROW_ASSIGN_OR_RAISE(auto buf, arrow::AllocateResizableBuffer(nbytes,
pool_));
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes,
buf->mutable_data()));
+ RETURN_NOT_OK(buf->Resize(bytes_read));
+ return std::move(buf);
+ }
+
+ int64_t decompressTime() const {
+ return decompressWallTime_ - blockingTime_;
+ }
+
+ private:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(CompressedInputStream);
+
+ CompressedInputStream() = default;
+
+ CompressedInputStream(const std::shared_ptr<InputStream>& raw,
arrow::MemoryPool* pool)
+ : raw_(raw), pool_(pool), is_open_(true), compressed_pos_(0),
decompressed_pos_(0), total_pos_(0) {}
+
+ // Read compressed data if necessary
+ arrow::Status EnsureCompressedData() {
+ int64_t compressed_avail = compressed_ ? compressed_->size() -
compressed_pos_ : 0;
+ if (compressed_avail == 0) {
+ ScopedTimer timer(&blockingTime_);
+ // No compressed data available, read a full chunk
+ ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
+ compressed_pos_ = 0;
+ }
+ return arrow::Status::OK();
+ }
+
+ // Decompress some data from the compressed_ buffer.
+ // Call this function only if the decompressed_ buffer is empty.
+ arrow::Status DecompressData() {
+ GLUTEN_CHECK(compressed_->data() != nullptr, "Compressed data is null");
+
+ int64_t decompress_size = kDecompressSize;
+
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(decompressed_,
AllocateResizableBuffer(decompress_size, pool_));
+ decompressed_pos_ = 0;
+
+ int64_t input_len = compressed_->size() - compressed_pos_;
+ const uint8_t* input = compressed_->data() + compressed_pos_;
+ int64_t output_len = decompressed_->size();
+ uint8_t* output = decompressed_->mutable_data();
+
+ ARROW_ASSIGN_OR_RAISE(auto result, decompressor_->Decompress(input_len,
input, output_len, output));
+ compressed_pos_ += result.bytes_read;
+ if (result.bytes_read > 0) {
+ fresh_decompressor_ = false;
+ }
+ if (result.bytes_written > 0 || !result.need_more_output || input_len ==
0) {
+ // StdMemoryAllocator does not allow resize to 0.
+ if (result.bytes_written > 0) {
+ RETURN_NOT_OK(decompressed_->Resize(result.bytes_written));
+ } else {
+ decompressed_.reset();
+ }
+ break;
+ }
+ GLUTEN_CHECK(result.bytes_written == 0, "Decompressor should return 0
bytes written");
+ // Need to enlarge output buffer
+ decompress_size *= 2;
+ }
+ return arrow::Status::OK();
+ }
+
+ // Read a given number of bytes from the decompressed_ buffer.
+ int64_t ReadFromDecompressed(int64_t nbytes, uint8_t* out) {
+ int64_t readable = decompressed_ ? (decompressed_->size() -
decompressed_pos_) : 0;
+ int64_t read_bytes = std::min(readable, nbytes);
+
+ if (read_bytes > 0) {
+ memcpy(out, decompressed_->data() + decompressed_pos_, read_bytes);
+ decompressed_pos_ += read_bytes;
+
+ if (decompressed_pos_ == decompressed_->size()) {
+ // Decompressed data is exhausted, release buffer
+ decompressed_.reset();
+ }
+ }
+
+ return read_bytes;
+ }
+
+ // Try to feed more data into the decompressed_ buffer.
+ arrow::Status RefillDecompressed(bool* has_data) {
+ // First try to read data from the decompressor
+ if (compressed_ && compressed_->size() != 0) {
+ if (decompressor_->IsFinished()) {
+ // We just went over the end of a previous compressed stream.
+ RETURN_NOT_OK(decompressor_->Reset());
+ fresh_decompressor_ = true;
+ }
+ RETURN_NOT_OK(DecompressData());
+ }
+ if (!decompressed_ || decompressed_->size() == 0) {
+ // Got nothing, need to read more compressed data
+ RETURN_NOT_OK(EnsureCompressedData());
+ if (compressed_pos_ == compressed_->size()) {
+ // No more data to decompress
+ if (!fresh_decompressor_ && !decompressor_->IsFinished()) {
+ return arrow::Status::IOError("Truncated compressed stream");
+ }
+ *has_data = false;
+ return arrow::Status::OK();
+ }
+ RETURN_NOT_OK(DecompressData());
+ }
+ *has_data = true;
+ return arrow::Status::OK();
+ }
+
+ std::shared_ptr<InputStream> raw() const {
+ return raw_;
+ }
+
+ // Read 64 KB compressed data at a time
+ static const int64_t kChunkSize = 64 * 1024;
+ // Decompress 1 MB at a time
+ static const int64_t kDecompressSize = 1024 * 1024;
+
+ std::shared_ptr<InputStream> raw_;
+ arrow::MemoryPool* pool_;
+
+ std::shared_ptr<arrow::util::Decompressor> decompressor_;
+ std::shared_ptr<arrow::Buffer> compressed_;
+
+ bool is_open_;
+ // Position in compressed buffer
+ int64_t compressed_pos_;
+ std::shared_ptr<arrow::ResizableBuffer> decompressed_;
+ // Position in decompressed buffer
+ int64_t decompressed_pos_;
+ // True if the decompressor hasn't read any data yet.
+ bool fresh_decompressor_;
+ // Total number of bytes decompressed
+ int64_t total_pos_;
+
+ int64_t blockingTime_{0};
+ int64_t decompressWallTime_{0};
+};
+
} // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 4bb426d864..9695ce35c1 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -30,13 +30,23 @@ void RssPartitionWriter::init() {
}
arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
- // Push data and collect metrics.
- auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(),
bytesEvicted_.end(), 0LL);
+ if (rssOs_ != nullptr && !rssOs_->closed()) {
+ if (compressedOs_ != nullptr) {
+ RETURN_NOT_OK(compressedOs_->Close());
+ compressTime_ = compressedOs_->compressTime();
+ spillTime_ -= compressTime_;
+ }
+ RETURN_NOT_OK(rssOs_->Flush());
+ ARROW_ASSIGN_OR_RAISE(bytesEvicted_[lastEvictedPartitionId_],
rssOs_->Tell());
+ RETURN_NOT_OK(rssOs_->Close());
+ }
+
rssClient_->stop();
+
+ auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(),
bytesEvicted_.end(), 0LL);
// Populate metrics.
metrics->totalCompressTime += compressTime_;
metrics->totalEvictTime += spillTime_;
- metrics->totalWriteTime += writeTime_;
metrics->totalBytesEvicted += totalBytesEvicted;
metrics->totalBytesWritten += totalBytesEvicted;
metrics->partitionLengths = std::move(bytesEvicted_);
@@ -53,17 +63,42 @@ arrow::Status RssPartitionWriter::hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
- bool reuseBuffers,
- bool hasComplexType) {
- return doEvict(partitionId, std::move(inMemoryPayload), nullptr);
+ bool reuseBuffers) {
+ return doEvict(partitionId, std::move(inMemoryPayload));
}
-arrow::Status RssPartitionWriter::sortEvict(
- uint32_t partitionId,
- std::unique_ptr<InMemoryPayload> inMemoryPayload,
- std::shared_ptr<arrow::Buffer> compressed,
- bool isFinal) {
- return doEvict(partitionId, std::move(inMemoryPayload),
std::move(compressed));
+arrow::Status
+RssPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
+ ScopedTimer timer(&spillTime_);
+ if (lastEvictedPartitionId_ != partitionId) {
+ if (lastEvictedPartitionId_ != -1) {
+ GLUTEN_DCHECK(rssOs_ != nullptr && !rssOs_->closed(),
"RssPartitionWriterOutputStream should not be null");
+ if (compressedOs_ != nullptr) {
+ RETURN_NOT_OK(compressedOs_->Flush());
+ }
+ RETURN_NOT_OK(rssOs_->Flush());
+ ARROW_ASSIGN_OR_RAISE(bytesEvicted_[lastEvictedPartitionId_],
rssOs_->Tell());
+ RETURN_NOT_OK(rssOs_->Close());
+ }
+
+ rssOs_ =
+ std::make_shared<RssPartitionWriterOutputStream>(partitionId,
rssClient_.get(), options_.pushBufferMaxSize);
+ RETURN_NOT_OK(rssOs_->init());
+ if (codec_ != nullptr) {
+ ARROW_ASSIGN_OR_RAISE(
+ compressedOs_, ShuffleCompressedOutputStream::Make(codec_.get(),
rssOs_, arrow::default_memory_pool()));
+ }
+
+ lastEvictedPartitionId_ = partitionId;
+ }
+
+ rawPartitionLengths_[partitionId] = inMemoryPayload->rawSize();
+ if (compressedOs_ != nullptr) {
+ RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs_.get()));
+ } else {
+ RETURN_NOT_OK(inMemoryPayload->serialize(rssOs_.get()));
+ }
+ return arrow::Status::OK();
}
arrow::Status RssPartitionWriter::evict(uint32_t partitionId,
std::unique_ptr<BlockPayload> blockPayload, bool) {
@@ -74,16 +109,12 @@ arrow::Status RssPartitionWriter::evict(uint32_t
partitionId, std::unique_ptr<Bl
return arrow::Status::OK();
}
-arrow::Status RssPartitionWriter::doEvict(
- uint32_t partitionId,
- std::unique_ptr<InMemoryPayload> inMemoryPayload,
- std::shared_ptr<arrow::Buffer> compressed) {
+arrow::Status RssPartitionWriter::doEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
auto payloadType = codec_ ? Payload::Type::kCompressed :
Payload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
auto payload,
- inMemoryPayload->toBlockPayload(
- payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr,
std::move(compressed)));
+ inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_
? codec_.get() : nullptr, nullptr));
// Copy payload to arrow buffered os.
ARROW_ASSIGN_OR_RAISE(auto rssBufferOs,
arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize));
RETURN_NOT_OK(payload->serialize(rssBufferOs.get()));
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index 2034c321e1..040c7546b9 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -26,6 +26,8 @@
namespace gluten {
+class RssPartitionWriterOutputStream;
+
class RssPartitionWriter final : public PartitionWriter {
public:
RssPartitionWriter(
@@ -41,14 +43,10 @@ class RssPartitionWriter final : public PartitionWriter {
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
- bool reuseBuffers,
- bool hasComplexType) override;
+ bool reuseBuffers) override;
- arrow::Status sortEvict(
- uint32_t partitionId,
- std::unique_ptr<InMemoryPayload> inMemoryPayload,
- std::shared_ptr<arrow::Buffer> compressed,
- bool isFinal) override;
+ arrow::Status sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal)
+ override;
arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload>
blockPayload, bool stop) override;
@@ -59,14 +57,91 @@ class RssPartitionWriter final : public PartitionWriter {
private:
void init();
- arrow::Status doEvict(
- uint32_t partitionId,
- std::unique_ptr<InMemoryPayload> inMemoryPayload,
- std::shared_ptr<arrow::Buffer> compressed);
+ arrow::Status doEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload>
inMemoryPayload);
std::shared_ptr<RssClient> rssClient_;
std::vector<int64_t> bytesEvicted_;
std::vector<int64_t> rawPartitionLengths_;
+
+ int32_t lastEvictedPartitionId_{-1};
+ std::shared_ptr<RssPartitionWriterOutputStream> rssOs_;
+ std::shared_ptr<ShuffleCompressedOutputStream> compressedOs_;
+};
+
+class RssPartitionWriterOutputStream final : public arrow::io::OutputStream {
+ public:
+ RssPartitionWriterOutputStream(int32_t partitionId, RssClient* rssClient,
int64_t pushBufferSize)
+ : partitionId_(partitionId), rssClient_(rssClient),
bufferSize_(pushBufferSize) {}
+
+ arrow::Status init() {
+ ARROW_ASSIGN_OR_RAISE(pushBuffer_, arrow::AllocateBuffer(bufferSize_,
arrow::default_memory_pool()));
+ pushBufferPtr_ = pushBuffer_->mutable_data();
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Close() override {
+ RETURN_NOT_OK(Flush());
+ pushBuffer_.reset();
+ return arrow::Status::OK();
+ }
+
+ bool closed() const override {
+ return pushBuffer_ == nullptr;
+ }
+
+ arrow::Result<int64_t> Tell() const override {
+ return bytesEvicted_ + bufferPos_;
+ }
+
+ arrow::Status Write(const void* data, int64_t nbytes) override {
+ auto dataPtr = static_cast<const char*>(data);
+ if (nbytes < 0) {
+ return arrow::Status::Invalid("write count should be >= 0");
+ }
+ if (nbytes == 0) {
+ return arrow::Status::OK();
+ }
+
+ if (nbytes + bufferPos_ <= bufferSize_) {
+ std::memcpy(pushBufferPtr_ + bufferPos_, dataPtr, nbytes);
+ bufferPos_ += nbytes;
+ return arrow::Status::OK();
+ }
+
+ int64_t bytesWritten = 0;
+ while (bytesWritten < nbytes) {
+ auto remaining = nbytes - bytesWritten;
+ if (remaining <= bufferSize_ - bufferPos_) {
+ std::memcpy(pushBufferPtr_ + bufferPos_, dataPtr + bytesWritten,
remaining);
+ bufferPos_ += remaining;
+ return arrow::Status::OK();
+ }
+ auto toWrite = bufferSize_ - bufferPos_;
+ std::memcpy(pushBufferPtr_ + bufferPos_, dataPtr + bytesWritten,
toWrite);
+ bytesWritten += toWrite;
+ bufferPos_ += toWrite;
+ RETURN_NOT_OK(Flush());
+ }
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Flush() override {
+ if (bufferPos_ > 0) {
+ bytesEvicted_ += rssClient_->pushPartitionData(partitionId_,
reinterpret_cast<char*>(pushBufferPtr_), bufferPos_);
+ bufferPos_ = 0;
+ }
+ return arrow::Status::OK();
+ }
+
+ private:
+ int32_t partitionId_;
+ RssClient* rssClient_;
+ int64_t bufferSize_{kDefaultPushMemoryThreshold};
+
+ std::shared_ptr<arrow::Buffer> pushBuffer_;
+ uint8_t* pushBufferPtr_{nullptr};
+ int64_t bufferPos_{0};
+ int64_t bytesEvicted_{0};
};
} // namespace gluten
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 244e0b885a..1a48ca815d 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -96,6 +96,7 @@ struct WriterMetrics {
int64_t writeTime{0};
int64_t compressTime{0};
+ int64_t dataSize{0};
int64_t bytesSpilled{0};
int64_t bytesWritten{0};
};
@@ -238,6 +239,8 @@ void populateWriterMetrics(
if (splitTime > 0) {
metrics.splitTime += splitTime;
}
+ metrics.dataSize +=
+ std::accumulate(shuffleWriter->rawPartitionLengths().begin(),
shuffleWriter->rawPartitionLengths().end(), 0LL);
metrics.bytesWritten += shuffleWriter->totalBytesWritten();
metrics.bytesSpilled += shuffleWriter->totalBytesEvicted();
}
@@ -302,6 +305,9 @@ void runShuffle(
// Read and discard.
auto cb = iter->next();
}
+ // Call the dtor to collect the metrics.
+ iter.reset();
+
readerMetrics.decompressTime = reader->getDecompressTime();
readerMetrics.deserializeTime = reader->getDeserializeTime();
}
@@ -343,6 +349,8 @@ void updateBenchmarkMetrics(
state.counters["shuffle_split_time"] =
benchmark::Counter(splitTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
+ state.counters["shuffle_data_size"] = benchmark::Counter(
+ writerMetrics.dataSize, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1024);
state.counters["shuffle_spilled_bytes"] = benchmark::Counter(
writerMetrics.bytesSpilled, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1024);
state.counters["shuffle_write_bytes"] = benchmark::Counter(
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index ee765f0219..ae2127a135 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -962,9 +962,8 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers(
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
bool reuseBuffers) {
if (!buffers.empty()) {
- auto payload = std::make_unique<InMemoryPayload>(numRows,
&isValidityBuffer_, std::move(buffers));
- RETURN_NOT_OK(
- partitionWriter_->hashEvict(partitionId, std::move(payload),
Evict::kCache, reuseBuffers, hasComplexType_));
+ auto payload = std::make_unique<InMemoryPayload>(numRows,
&isValidityBuffer_, std::move(buffers), hasComplexType_);
+ RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload),
Evict::kCache, reuseBuffers));
}
return arrow::Status::OK();
}
@@ -1373,9 +1372,10 @@ arrow::Result<int64_t>
VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6
for (auto& item : pidToSize) {
auto pid = item.first;
ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false));
- auto payload = std::make_unique<InMemoryPayload>(item.second,
&isValidityBuffer_, std::move(buffers));
+ auto payload =
+ std::make_unique<InMemoryPayload>(item.second, &isValidityBuffer_,
std::move(buffers), hasComplexType_);
metrics_.totalBytesToEvict += payload->rawSize();
- RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload),
Evict::kSpill, false, hasComplexType_));
+ RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload),
Evict::kSpill, false));
evicted = beforeEvict - partitionBufferPool_->bytes_allocated();
if (evicted >= size) {
break;
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 5a379b8656..a5c727f602 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -19,6 +19,8 @@
#include <arrow/array/array_binary.h>
#include <arrow/io/buffered.h>
+#include <arrow/io/compressed.h>
+#include <velox/common/caching/AsyncDataCache.h>
#include "memory/VeloxColumnarBatch.h"
#include "shuffle/GlutenByteStream.h"
@@ -41,14 +43,16 @@
using namespace facebook::velox;
namespace gluten {
-
namespace {
+constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize;
struct BufferViewReleaser {
BufferViewReleaser() : BufferViewReleaser(nullptr) {}
+
BufferViewReleaser(std::shared_ptr<arrow::Buffer> arrowBuffer) :
bufferReleaser_(std::move(arrowBuffer)) {}
void addRef() const {}
+
void release() const {}
private:
@@ -281,7 +285,6 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, pool);
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
-
} // namespace
VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
@@ -384,32 +387,40 @@
VeloxSortShuffleReaderDeserializer::VeloxSortShuffleReaderDeserializer(
codec_(codec),
rowType_(rowType),
batchSize_(batchSize),
- arrowPool_(memoryPool),
veloxPool_(veloxPool),
deserializeTime_(deserializeTime),
decompressTime_(decompressTime) {
- GLUTEN_ASSIGN_OR_THROW(in_,
arrow::io::BufferedInputStream::Create(bufferSize, memoryPool, std::move(in)));
+ if (codec_ != nullptr) {
+ GLUTEN_ASSIGN_OR_THROW(in_, CompressedInputStream::Make(codec_.get(),
std::move(in), memoryPool));
+ } else {
+ GLUTEN_ASSIGN_OR_THROW(in_,
arrow::io::BufferedInputStream::Create(bufferSize, memoryPool, std::move(in)));
+ }
+}
+
+VeloxSortShuffleReaderDeserializer::~VeloxSortShuffleReaderDeserializer() {
+ if (auto in = std::dynamic_pointer_cast<CompressedInputStream>(in_)) {
+ decompressTime_ += in->decompressTime();
+ }
}
std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::next() {
if (reachedEos_) {
- if (cachedRows_ > 0) {
- return deserializeToBatch();
- }
return nullptr;
}
- if (cachedRows_ >= batchSize_) {
- return deserializeToBatch();
+ if (rowBuffer_ == nullptr) {
+ rowBuffer_ = AlignedBuffer::allocate<char>(kMaxReadBufferSize, veloxPool_);
+ rowBufferPtr_ = rowBuffer_->asMutable<char>();
+ data_.reserve(batchSize_);
}
- while (cachedRows_ < batchSize_) {
- uint32_t numRows = 0;
- GLUTEN_ASSIGN_OR_THROW(
- auto arrowBuffers,
- BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows,
deserializeTime_, decompressTime_));
+ if (lastRowSize_ != 0) {
+ readNextRow();
+ }
- if (arrowBuffers.empty()) {
+ while (cachedRows_ < batchSize_) {
+ GLUTEN_ASSIGN_OR_THROW(auto bytes, in_->Read(sizeof(RowSizeType),
&lastRowSize_));
+ if (bytes == 0) {
reachedEos_ = true;
if (cachedRows_ > 0) {
return deserializeToBatch();
@@ -417,80 +428,35 @@ std::shared_ptr<ColumnarBatch>
VeloxSortShuffleReaderDeserializer::next() {
return nullptr;
}
- if (numRows > 0) {
- auto buffer = std::move(arrowBuffers[0]);
- cachedInputs_.emplace_back(numRows,
wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
- cachedRows_ += numRows;
- } else {
- // numRows = 0 indicates that we read a segment of a large row.
- readLargeRow(arrowBuffers);
+ GLUTEN_CHECK(
+ lastRowSize_ <= kMaxReadBufferSize, "Row size exceeds max read buffer
size: " + std::to_string(lastRowSize_));
+
+ if (lastRowSize_ + bytesRead_ > kMaxReadBufferSize) {
+ return deserializeToBatch();
}
+ readNextRow();
}
+
return deserializeToBatch();
}
std::shared_ptr<ColumnarBatch>
VeloxSortShuffleReaderDeserializer::deserializeToBatch() {
ScopedTimer timer(&deserializeTime_);
- std::vector<std::string_view> data;
- data.reserve(std::min(cachedRows_, batchSize_));
-
- uint32_t readRows = 0;
- auto cur = cachedInputs_.begin();
- while (readRows < batchSize_ && cur != cachedInputs_.end()) {
- auto buffer = cur->second;
- const auto* rawBuffer = buffer->as<char>();
- while (rowOffset_ < cur->first && readRows < batchSize_) {
- auto rowSize = *(reinterpret_cast<const RowSizeType*>(rawBuffer +
byteOffset_)) - sizeof(RowSizeType);
- byteOffset_ += sizeof(RowSizeType);
- data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize));
- byteOffset_ += rowSize;
- ++rowOffset_;
- ++readRows;
- }
- if (rowOffset_ == cur->first) {
- rowOffset_ = 0;
- byteOffset_ = 0;
- ++cur;
- }
- }
- cachedRows_ -= readRows;
- auto rowVector = facebook::velox::row::CompactRow::deserialize(data,
rowType_, veloxPool_);
- // Free memory.
- auto iter = cachedInputs_.begin();
- while (iter++ != cur) {
- cachedInputs_.pop_front();
- }
+
+ auto rowVector = facebook::velox::row::CompactRow::deserialize(data_,
rowType_, veloxPool_);
+
+ cachedRows_ = 0;
+ bytesRead_ = 0;
+ data_.resize(0);
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
-void
VeloxSortShuffleReaderDeserializer::readLargeRow(std::vector<std::shared_ptr<arrow::Buffer>>&
arrowBuffers) {
- // Cache the read segment.
- std::vector<std::shared_ptr<arrow::Buffer>> buffers;
- auto rowSize =
*reinterpret_cast<RowSizeType*>(const_cast<uint8_t*>(arrowBuffers[0]->data()));
- RowSizeType bufferSize = arrowBuffers[0]->size();
- buffers.emplace_back(std::move(arrowBuffers[0]));
- // Read and cache the remaining segments.
- uint32_t numRows;
- while (bufferSize < rowSize) {
- GLUTEN_ASSIGN_OR_THROW(
- arrowBuffers,
- BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows,
deserializeTime_, decompressTime_));
- VELOX_DCHECK_EQ(numRows, 0);
- bufferSize += arrowBuffers[0]->size();
- buffers.emplace_back(std::move(arrowBuffers[0]));
- }
- VELOX_CHECK_EQ(bufferSize, rowSize);
- // Merge all segments.
- GLUTEN_ASSIGN_OR_THROW(std::shared_ptr<arrow::Buffer> rowBuffer,
arrow::AllocateBuffer(rowSize, arrowPool_));
- RowSizeType bytes = 0;
- auto* dst = rowBuffer->mutable_data();
- for (const auto& buffer : buffers) {
- VELOX_DCHECK_NOT_NULL(buffer);
- gluten::fastCopy(dst + bytes, buffer->data(), buffer->size());
- bytes += buffer->size();
- }
- cachedInputs_.emplace_back(1, wrapInBufferViewAsOwner(rowBuffer->data(),
rowSize, rowBuffer));
- cachedRows_++;
+void VeloxSortShuffleReaderDeserializer::readNextRow() {
+ GLUTEN_THROW_NOT_OK(in_->Read(lastRowSize_, rowBufferPtr_ + bytesRead_));
+ data_.push_back(std::string_view(rowBufferPtr_ + bytesRead_, lastRowSize_));
+ bytesRead_ += lastRowSize_;
+ lastRowSize_ = 0;
+ ++cachedRows_;
}
class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public
facebook::velox::GlutenByteInputStream {
@@ -705,5 +671,4 @@ int64_t VeloxShuffleReader::getDecompressTime() const {
int64_t VeloxShuffleReader::getDeserializeTime() const {
return factory_->getDeserializeTime();
}
-
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h
b/cpp/velox/shuffle/VeloxShuffleReader.h
index d7aa145ca1..29ca4d218b 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -79,29 +79,33 @@ class VeloxSortShuffleReaderDeserializer final : public
ColumnarBatchIterator {
int64_t& deserializeTime,
int64_t& decompressTime);
+ ~VeloxSortShuffleReaderDeserializer() override;
+
std::shared_ptr<ColumnarBatch> next() override;
private:
std::shared_ptr<ColumnarBatch> deserializeToBatch();
- void readLargeRow(std::vector<std::shared_ptr<arrow::Buffer>>& arrowBuffers);
+ void readNextRow();
- std::shared_ptr<arrow::io::InputStream> in_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::RowTypePtr rowType_;
uint32_t batchSize_;
- arrow::MemoryPool* arrowPool_;
facebook::velox::memory::MemoryPool* veloxPool_;
int64_t& deserializeTime_;
int64_t& decompressTime_;
- std::list<std::pair<uint32_t, facebook::velox::BufferPtr>> cachedInputs_;
+ facebook::velox::BufferPtr rowBuffer_{nullptr};
+ char* rowBufferPtr_{nullptr};
+ uint32_t bytesRead_{0};
+ uint32_t lastRowSize_{0};
+ std::vector<std::string_view> data_;
+
+ std::shared_ptr<arrow::io::InputStream> in_;
+
uint32_t cachedRows_{0};
bool reachedEos_{false};
-
- uint32_t rowOffset_{0};
- size_t byteOffset_{0};
};
class VeloxRssSortShuffleReaderDeserializer : public ColumnarBatchIterator {
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 52a5240186..6c78752664 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -17,13 +17,13 @@
#include "shuffle/VeloxSortShuffleWriter.h"
-#include <arrow/io/memory.h>
-
-#include "memory/ArrowMemory.h"
#include "memory/VeloxColumnarBatch.h"
+#include "shuffle/RadixSort.h"
#include "utils/Common.h"
#include "utils/Timer.h"
+#include <arrow/io/memory.h>
+
namespace gluten {
namespace {
@@ -108,14 +108,8 @@ arrow::Status VeloxSortShuffleWriter::init() {
// In Spark, sortedBuffer_ memory and compressionBuffer_ memory are
pre-allocated and counted into executor
// memory overhead. To align with Spark, we use arrow::default_memory_pool()
to avoid counting these memory in Gluten.
ARROW_ASSIGN_OR_RAISE(
- sortedBuffer_, arrow::AllocateBuffer(options_.sortEvictBufferSize,
arrow::default_memory_pool()));
- rawBuffer_ = sortedBuffer_->mutable_data();
- auto compressedBufferLength = partitionWriter_->getCompressedBufferLength(
- {std::make_shared<arrow::Buffer>(rawBuffer_,
options_.sortEvictBufferSize)});
- if (compressedBufferLength.has_value()) {
- ARROW_ASSIGN_OR_RAISE(
- compressionBuffer_, arrow::AllocateBuffer(*compressedBufferLength,
arrow::default_memory_pool()));
- }
+ sortedBuffer_, arrow::AllocateBuffer(options_.diskWriteBufferSize,
arrow::default_memory_pool()));
+ sortedBufferPtr_ = sortedBuffer_->mutable_data();
return arrow::Status::OK();
}
@@ -123,9 +117,6 @@ void VeloxSortShuffleWriter::initRowType(const
facebook::velox::RowVectorPtr& rv
if (UNLIKELY(!rowType_)) {
rowType_ = facebook::velox::asRowType(rv->type());
fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_);
- if (fixedRowSize_) {
- *fixedRowSize_ += sizeof(RowSizeType);
- }
}
}
@@ -168,17 +159,16 @@ arrow::Status VeloxSortShuffleWriter::insert(const
facebook::velox::RowVectorPtr
facebook::velox::row::CompactRow row(vector);
- if (!fixedRowSize_) {
+ if (fixedRowSize_.has_value()) {
+ rowSize_.resize(inputRows, fixedRowSize_.value() + sizeof(RowSizeType));
+ } else {
rowSize_.resize(inputRows);
rowSizePrefixSum_.resize(inputRows + 1);
rowSizePrefixSum_[0] = 0;
for (auto i = 0; i < inputRows; ++i) {
- auto rowSize = row.rowSize(i) + sizeof(RowSizeType);
- rowSize_[i] = rowSize;
- rowSizePrefixSum_[i + 1] = rowSizePrefixSum_[i] + rowSize;
+ rowSize_[i] = row.rowSize(i);
+ rowSizePrefixSum_[i + 1] = rowSizePrefixSum_[i] + rowSize_[i] +
sizeof(RowSizeType);
}
- } else {
- rowSize_.resize(inputRows, *fixedRowSize_);
}
facebook::velox::vector_size_t rowOffset = 0;
@@ -186,7 +176,8 @@ arrow::Status VeloxSortShuffleWriter::insert(const
facebook::velox::RowVectorPtr
auto remainingRows = inputRows - rowOffset;
auto rows = maxRowsToInsert(rowOffset, remainingRows);
if (rows == 0) {
- auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() :
rowSize_[rowOffset];
+ auto minSizeRequired =
+ (fixedRowSize_.has_value() ? fixedRowSize_.value() :
rowSize_[rowOffset]) + sizeof(RowSizeType);
acquireNewBuffer(static_cast<uint64_t>(memLimit), minSizeRequired);
rows = maxRowsToInsert(rowOffset, remainingRows);
ARROW_RETURN_IF(
@@ -215,7 +206,7 @@ void VeloxSortShuffleWriter::insertRows(
// size(RowSize) | bytes
memcpy(currentPage_ + pageCursor_, &rowSize_[row], sizeof(RowSizeType));
offsets[i] = pageCursor_ + sizeof(RowSizeType);
- pageCursor_ += rowSize_[row];
+ pageCursor_ += rowSize_[row] + sizeof(RowSizeType);
VELOX_DCHECK_LE(pageCursor_, currenPageSize_);
}
compact.serialize(offset, size, offsets.data(), currentPage_);
@@ -288,61 +279,55 @@ arrow::Status
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
// Serialize [begin, end)
int64_t offset = 0;
char* addr;
- uint32_t size;
+ uint32_t recordSize;
auto index = begin;
while (index < end) {
auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]);
addr = pageAddresses_[pageIndex.first] + pageIndex.second;
- size = *(reinterpret_cast<RowSizeType*>(addr));
- if (offset + size > options_.sortEvictBufferSize && offset > 0) {
+ recordSize = *(reinterpret_cast<RowSizeType*>(addr)) + sizeof(RowSizeType);
+ if (offset + recordSize > options_.diskWriteBufferSize && offset > 0) {
sortTime.stop();
- RETURN_NOT_OK(evictPartitionInternal(partitionId, index - begin,
rawBuffer_, offset));
+ RETURN_NOT_OK(evictPartitionInternal(partitionId, sortedBufferPtr_,
offset));
sortTime.start();
begin = index;
offset = 0;
}
- if (size > static_cast<uint32_t>(options_.sortEvictBufferSize)) {
+ if (recordSize > static_cast<uint32_t>(options_.diskWriteBufferSize)) {
// Split large rows.
sortTime.stop();
RowSizeType bytes = 0;
auto* buffer = reinterpret_cast<uint8_t*>(addr);
- while (bytes < size) {
- auto rawLength =
std::min<RowSizeType>((uint32_t)options_.sortEvictBufferSize, size - bytes);
+ while (bytes < recordSize) {
+ auto rawLength =
std::min<RowSizeType>((uint32_t)options_.diskWriteBufferSize, recordSize -
bytes);
// Use numRows = 0 to represent a part of row.
- RETURN_NOT_OK(evictPartitionInternal(partitionId, 0, buffer + bytes,
rawLength));
+ RETURN_NOT_OK(evictPartitionInternal(partitionId, buffer + bytes,
rawLength));
bytes += rawLength;
}
begin++;
sortTime.start();
} else {
// Copy small rows.
- gluten::fastCopy(rawBuffer_ + offset, addr, size);
- offset += size;
+ gluten::fastCopy(sortedBufferPtr_ + offset, addr, recordSize);
+ offset += recordSize;
}
index++;
}
sortTime.stop();
if (offset > 0) {
VELOX_CHECK(index > begin);
- RETURN_NOT_OK(evictPartitionInternal(partitionId, index - begin,
rawBuffer_, offset));
+ RETURN_NOT_OK(evictPartitionInternal(partitionId, sortedBufferPtr_,
offset));
}
sortTime_ += sortTime.realTimeUsed();
return arrow::Status::OK();
}
-arrow::Status VeloxSortShuffleWriter::evictPartitionInternal(
- uint32_t partitionId,
- int32_t numRows,
- uint8_t* buffer,
- int64_t rawLength) {
+arrow::Status VeloxSortShuffleWriter::evictPartitionInternal(uint32_t
partitionId, uint8_t* buffer, int64_t rawLength) {
VELOX_CHECK(rawLength > 0);
auto payload = std::make_unique<InMemoryPayload>(
- numRows,
- nullptr,
-
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
rawLength)});
+ 0, nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
rawLength)});
updateSpillMetrics(payload);
- RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload),
compressionBuffer_, stopped_));
+ RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload),
stopped_));
return arrow::Status::OK();
}
@@ -354,9 +339,10 @@ facebook::velox::vector_size_t
VeloxSortShuffleWriter::maxRowsToInsert(
return 0;
}
auto remainingBytes = pages_.back()->size() - pageCursor_;
- if (fixedRowSize_) {
+ if (fixedRowSize_.has_value()) {
return std::min(
- static_cast<facebook::velox::vector_size_t>(remainingBytes /
(fixedRowSize_.value())), remainingRows);
+ static_cast<facebook::velox::vector_size_t>(remainingBytes /
(fixedRowSize_.value() + sizeof(RowSizeType))),
+ remainingRows);
}
auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
auto bytesWritten = rowSizePrefixSum_[offset];
@@ -445,7 +431,7 @@ int64_t VeloxSortShuffleWriter::totalC2RTime() const {
void VeloxSortShuffleWriter::allocateMinimalArray() {
auto array = facebook::velox::AlignedBuffer::allocate<char>(
- options_.sortBufferInitialSize * sizeof(uint64_t), veloxPool_.get());
+ options_.initialSortBufferSize * sizeof(uint64_t), veloxPool_.get());
setUpArray(std::move(array));
}
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 42726a663b..cb8b3ba557 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -17,16 +17,13 @@
#pragma once
-#include "shuffle/RadixSort.h"
#include "shuffle/VeloxShuffleWriter.h"
#include <arrow/status.h>
-#include <map>
#include <vector>
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/row/CompactRow.h"
-#include "velox/vector/BaseVector.h"
namespace gluten {
@@ -80,7 +77,7 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);
- arrow::Status evictPartitionInternal(uint32_t partitionId, int32_t numRows,
uint8_t* buffer, int64_t rawLength);
+ arrow::Status evictPartitionInternal(uint32_t partitionId, uint8_t* buffer,
int64_t rawLength);
facebook::velox::vector_size_t maxRowsToInsert(
facebook::velox::vector_size_t offset,
@@ -113,8 +110,7 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
uint32_t currenPageSize_;
std::unique_ptr<arrow::Buffer> sortedBuffer_;
- uint8_t* rawBuffer_;
- std::shared_ptr<arrow::Buffer> compressionBuffer_{nullptr};
+ uint8_t* sortedBufferPtr_;
// Row ID -> Partition ID
// subscript: The index of row in the current input RowVector
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index e760a469b1..fac503ca70 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -69,17 +69,20 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
for (const auto& compression : compressions) {
- for (const auto compressionBufferSize : {4, 56, 32 * 1024}) {
- for (auto useRadixSort : {true, false}) {
- params.push_back(ShuffleTestParams{
- .shuffleWriterType = ShuffleWriterType::kSortShuffle,
- .partitionWriterType = PartitionWriterType::kLocal,
- .compressionType = compression,
- .compressionBufferSize = compressionBufferSize,
- .useRadixSort = useRadixSort});
+ for (const auto partitionWriterType : {PartitionWriterType::kLocal,
PartitionWriterType::kRss}) {
+ for (const auto diskWriteBufferSize : {4, 56, 32 * 1024}) {
+ for (auto useRadixSort : {true, false}) {
+ params.push_back(ShuffleTestParams{
+ .shuffleWriterType = ShuffleWriterType::kSortShuffle,
+ .partitionWriterType = partitionWriterType,
+ .compressionType = compression,
+ .diskWriteBufferSize = diskWriteBufferSize,
+ .useRadixSort = useRadixSort});
+ }
}
}
params.push_back(ShuffleTestParams{ShuffleWriterType::kRssSortShuffle,
PartitionWriterType::kRss, compression});
+
for (const auto compressionThreshold : compressionThresholds) {
for (const auto mergeBufferSize : mergeBufferSizes) {
params.push_back(ShuffleTestParams{
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index 4fcab8f242..7a9c6c8971 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -68,14 +68,14 @@ struct ShuffleTestParams {
arrow::Compression::type compressionType;
int32_t compressionThreshold{0};
int32_t mergeBufferSize{0};
- int32_t compressionBufferSize{0};
+ int32_t diskWriteBufferSize{0};
bool useRadixSort{false};
std::string toString() const {
std::ostringstream out;
out << "shuffleWriterType = " << shuffleWriterType << ",
partitionWriterType = " << partitionWriterType
<< ", compressionType = " << compressionType << ",
compressionThreshold = " << compressionThreshold
- << ", mergeBufferSize = " << mergeBufferSize << ",
compressionBufferSize = " << compressionBufferSize
+ << ", mergeBufferSize = " << mergeBufferSize << ",
compressionBufferSize = " << diskWriteBufferSize
<< ", useRadixSort = " << (useRadixSort ? "true" : "false");
return out.str();
}
@@ -261,7 +261,7 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
ShuffleTestParams params = GetParam();
shuffleWriterOptions_.useRadixSort = params.useRadixSort;
- shuffleWriterOptions_.sortEvictBufferSize = params.compressionBufferSize;
+ shuffleWriterOptions_.diskWriteBufferSize = params.diskWriteBufferSize;
partitionWriterOptions_.compressionType = params.compressionType;
switch (partitionWriterOptions_.compressionType) {
case arrow::Compression::UNCOMPRESSED:
@@ -365,7 +365,7 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
std::move(codec),
veloxCompressionType,
rowType,
- std::numeric_limits<int32_t>::max(),
+ kDefaultBatchSize,
kDefaultReadBufferSize,
defaultArrowMemoryPool().get(),
pool_,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]