marin-ma commented on code in PR #5786:
URL: https://github.com/apache/incubator-gluten/pull/5786#discussion_r1604605226


##########
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc:
##########
@@ -154,83 +127,71 @@ arrow::Status 
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxSortBasedShuffleWriter::evictBatch(uint32_t partitionId, 
facebook::velox::RowTypePtr* rowTypePtr) {
+arrow::Status VeloxSortBasedShuffleWriter::evictBatch(uint32_t partitionId) {
   int64_t rawSize = batch_->size();
   bufferOutputStream_->seekp(0);
   batch_->flush(bufferOutputStream_.get());
   auto buffer = bufferOutputStream_->getBuffer();
   RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize, 
buffer->as<char>(), buffer->size()));
   batch_ = 
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), 
serde_.get());
-  batch_->createStreamTree(*rowTypePtr, options_.bufferSize, &serdeOptions_);
+  batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_);
   return arrow::Status::OK();
 }
 
 arrow::Status VeloxSortBasedShuffleWriter::evictRowVector(uint32_t 
partitionId) {
-  int32_t rowNum = 0;
-  const int32_t maxBatchNum = options_.bufferSize;
-  auto rowTypePtr = std::static_pointer_cast<const 
facebook::velox::RowType>(rowType_.value());
+  int32_t accumulatedRows = 0;
+  const int32_t maxRowsPerBatch = options_.bufferSize;
 
   if (options_.partitioning != Partitioning::kSingle) {
     if (auto it = rowVectorIndexMap_.find(partitionId); it != 
rowVectorIndexMap_.end()) {
-      auto rowVectorIndex = it->second;
-      const int32_t outputSize = rowVectorIndex.size();
+      const auto& rowIndices = it->second;
+      VELOX_DCHECK(!rowIndices.empty())
 
-      std::map<int32_t, std::vector<facebook::velox::IndexRange>> 
groupedIndices;
-      std::map<int32_t, int64_t> groupedSize;
+      size_t idx = 0;
+      const auto outputSize = rowIndices.size();
+      while (idx < outputSize) {
+        auto combinedRowIndex = rowIndices[idx];
+        auto inputVectorIndex = static_cast<int32_t>(combinedRowIndex >> 32);
+        auto startRow = static_cast<int32_t>(combinedRowIndex & 0xFFFFFFFFLL);
 
-      int32_t tempVectorIndex = -1;
-      int32_t baseRowIndex = -1;
-      int32_t tempRowIndex = -1;
-      int32_t size = 1;
-      for (int start = 0; start < outputSize; start++) {
-        const int64_t rowVector = rowVectorIndex[start];
-        const int32_t vectorIndex = static_cast<int32_t>(rowVector >> 32);
-        const int32_t rowIndex = static_cast<int32_t>(rowVector & 
0xFFFFFFFFLL);
-        if (tempVectorIndex == -1) {
-          tempVectorIndex = vectorIndex;
-          baseRowIndex = rowIndex;
-          tempRowIndex = rowIndex;
-        } else {
-          if (vectorIndex == tempVectorIndex && rowIndex == tempRowIndex + 1) {
-            size += 1;
-            tempRowIndex = rowIndex;
+        int32_t numRowsInRange = 1;
+        std::vector<facebook::velox::IndexRange> groupedIndices;

Review Comment:
   Post reply here:
   
   > @kerwin-zk I don't think so. Sort shuffle applies when the shuffle 
partition is large, so there won't be may rows to the partition in a input 
RowVector, therefore the actual size of groupedIndices should be small. 
Reserving maxRowsPerBatch can be a waste of memory.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to