This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f7971f6c16 [VL] Sort shuffle writer use vectorized c2r (#6782)
f7971f6c16 is described below
commit f7971f6c16d8ae51bed984632afa735fe4c7c585
Author: Rong Ma <[email protected]>
AuthorDate: Thu Nov 7 09:53:54 2024 +0800
[VL] Sort shuffle writer use vectorized c2r (#6782)
---
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 29 ++++++++++++++++++-----------
cpp/velox/shuffle/VeloxSortShuffleWriter.h | 9 +++++++--
2 files changed, 25 insertions(+), 13 deletions(-)
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index f87eaabb56..ab37c0be74 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -180,7 +180,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const
facebook::velox::RowVectorPtr
rowSize_.resize(inputRows, *fixedRowSize_);
}
- uint32_t rowOffset = 0;
+ facebook::velox::vector_size_t rowOffset = 0;
while (rowOffset < inputRows) {
auto remainingRows = inputRows - rowOffset;
auto rows = maxRowsToInsert(rowOffset, remainingRows);
@@ -201,18 +201,23 @@ arrow::Status VeloxSortShuffleWriter::insert(const
facebook::velox::RowVectorPtr
return arrow::Status::OK();
}
-void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row,
uint32_t offset, uint32_t rows) {
+void VeloxSortShuffleWriter::insertRows(
+ facebook::velox::row::CompactRow& compact,
+ facebook::velox::vector_size_t offset,
+ facebook::velox::vector_size_t size) {
VELOX_CHECK(!pages_.empty());
- for (auto i = offset; i < offset + rows; ++i) {
- auto pid = row2Partition_[i];
+ std::vector<size_t> offsets(size);
+ for (auto i = 0; i < size; ++i) {
+ auto row = offset + i;
+ auto pid = row2Partition_[row];
arrayPtr_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_);
// size(RowSize) | bytes
- memcpy(currentPage_ + pageCursor_, &rowSize_[i], sizeof(RowSizeType));
- pageCursor_ += sizeof(RowSizeType);
- auto size = row.serialize(i, currentPage_ + pageCursor_);
- pageCursor_ += size;
+ memcpy(currentPage_ + pageCursor_, &rowSize_[row], sizeof(RowSizeType));
+ offsets[i] = pageCursor_ + sizeof(RowSizeType);
+ pageCursor_ += rowSize_[row];
VELOX_DCHECK_LE(pageCursor_, currenPageSize_);
}
+ compact.serialize(offset, size, offsets.data(), currentPage_);
}
arrow::Status VeloxSortShuffleWriter::maybeSpill(uint32_t nextRows) {
@@ -337,19 +342,21 @@ VeloxSortShuffleWriter::evictPartition0(uint32_t
partitionId, int32_t numRows, u
return arrow::Status::OK();
}
-uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t
remainingRows) {
+facebook::velox::vector_size_t VeloxSortShuffleWriter::maxRowsToInsert(
+ facebook::velox::vector_size_t offset,
+ facebook::velox::vector_size_t remainingRows) {
// Check how many rows can be handled.
if (pages_.empty()) {
return 0;
}
auto remainingBytes = pages_.back()->size() - pageCursor_;
if (fixedRowSize_) {
- return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())),
remainingRows);
+ return std::min((facebook::velox::vector_size_t)(remainingBytes /
(fixedRowSize_.value())), remainingRows);
}
auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
auto bytesWritten = rowSizePrefixSum_[offset];
auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(),
remainingBytes + bytesWritten);
- return iter - beginIter;
+ return (facebook::velox::vector_size_t)(iter - beginIter);
}
void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t memLimit, uint64_t
minSizeRequired) {
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 531ed1fe3e..5b8cff452d 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -69,7 +69,10 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
arrow::Status insert(const facebook::velox::RowVectorPtr& vector, int64_t
memLimit);
- void insertRows(facebook::velox::row::CompactRow& row, uint32_t offset,
uint32_t rows);
+ void insertRows(
+ facebook::velox::row::CompactRow& compact,
+ facebook::velox::vector_size_t offset,
+ facebook::velox::vector_size_t size);
arrow::Status maybeSpill(uint32_t nextRows);
@@ -79,7 +82,9 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
arrow::Status evictPartition0(uint32_t partitionId, int32_t numRows,
uint8_t* buffer, int64_t rawLength);
- uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows);
+ facebook::velox::vector_size_t maxRowsToInsert(
+ facebook::velox::vector_size_t offset,
+ facebook::velox::vector_size_t remainingRows);
void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]