This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c5c43c1a6f [GLUTEN-11569][VL] Following #11527, fix the error calling 
#noMoreSplits on cuDF value stream nodes (#11572)
c5c43c1a6f is described below

commit c5c43c1a6fc8186c6ffaeb46c1e94fd7bbb8d076
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Feb 9 14:29:07 2026 +0000

    [GLUTEN-11569][VL] Following #11527, fix the error calling #noMoreSplits on 
cuDF value stream nodes (#11572)
---
 cpp/velox/compute/VeloxPlanConverter.cc       |  1 +
 cpp/velox/compute/VeloxRuntime.cc             | 11 ++++++++---
 cpp/velox/compute/WholeStageResultIterator.cc |  6 +++---
 cpp/velox/compute/WholeStageResultIterator.h  |  2 +-
 cpp/velox/substrait/SubstraitToVeloxPlan.cc   |  7 ++++---
 cpp/velox/substrait/SubstraitToVeloxPlan.h    | 13 +++++++++++--
 6 files changed, 28 insertions(+), 12 deletions(-)

diff --git a/cpp/velox/compute/VeloxPlanConverter.cc 
b/cpp/velox/compute/VeloxPlanConverter.cc
index 4764fad382..627bd396b7 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -46,6 +46,7 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
   using SubstraitFileFormatCase = 
::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase;
 
   auto splitInfo = std::make_shared<SplitInfo>();
+  splitInfo->leafType = SplitInfo::LeafType::TABLE_SCAN;
   splitInfo->paths.reserve(fileList.size());
   splitInfo->starts.reserve(fileList.size());
   splitInfo->lengths.reserve(fileList.size());
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index 69d21b4a57..e88cb43a91 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -137,11 +137,16 @@ void VeloxRuntime::getInfoAndIds(
     // 1. Streams follow "iterator:<idx>" in the substrait plan;
     // 2. Files follow the traversal order in the plan node tree.
     // FIXME: Why we didn't have a unified design?
-    if (splitInfo->isStream) {
+    switch (splitInfo->leafType) {
+    case SplitInfo::LeafType::SPLIT_AWARE_STREAM:
       
streamIds.emplace_back(ValueStreamConnectorFactory::nodeIdOf(streamIdx++));
-    } else {
-      scanInfos.emplace_back(splitInfo);
+break;
+      case SplitInfo::LeafType::TABLE_SCAN:
+        scanInfos.emplace_back(splitInfo);
       scanIds.emplace_back(leafPlanNodeId);
+break;
+      case SplitInfo::LeafType::TRIVIAL_LEAF:
+break;
     }
   }
 }
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 9849075e1f..185dc5fd5e 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -359,7 +359,7 @@ void WholeStageResultIterator::constructPartitionColumns(
 }
 
 void WholeStageResultIterator::addIteratorSplits(const 
std::vector<std::shared_ptr<ResultIterator>>& inputIterators) {
-  GLUTEN_CHECK(!allSplitsAdded, "Method addIteratorSplits should not be called 
since all splits has been added to the Velox task.");
+  GLUTEN_CHECK(!allSplitsAdded_, "Method addIteratorSplits should not be 
called since all splits has been added to the Velox task.");
   // Create IteratorConnectorSplit for each iterator
   for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) {
     if (inputIterators[i] == nullptr) {
@@ -373,7 +373,7 @@ void WholeStageResultIterator::addIteratorSplits(const 
std::vector<std::shared_p
 }
 
 void WholeStageResultIterator::noMoreSplits() {
-  if (allSplitsAdded) {
+  if (allSplitsAdded_) {
     return;
   }
   // Mark no more splits for all scan nodes
@@ -391,7 +391,7 @@ void WholeStageResultIterator::noMoreSplits() {
   for (const auto& streamId : streamIds_) {
     task_->noMoreSplits(streamId);
   }
-  allSplitsAdded = true;
+  allSplitsAdded_ = true;
 }
 
 void WholeStageResultIterator::collectMetrics() {
diff --git a/cpp/velox/compute/WholeStageResultIterator.h 
b/cpp/velox/compute/WholeStageResultIterator.h
index 401cff06da..85cb18fd25 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -139,7 +139,7 @@ class WholeStageResultIterator : public 
SplitAwareColumnarBatchIterator {
   std::vector<std::shared_ptr<SplitInfo>> scanInfos_;
   std::vector<facebook::velox::core::PlanNodeId> streamIds_;
   std::vector<std::vector<facebook::velox::exec::Split>> splits_;
-  bool allSplitsAdded = false;
+  bool allSplitsAdded_ = false;
 
   int64_t loadLazyVectorTime_ = 0;
 };
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 0085c553ba..b543dfa8ba 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -1323,7 +1323,7 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::constructValueStreamNode(
 
   // Mark this as a stream-based split
   auto splitInfo = std::make_shared<SplitInfo>();
-  splitInfo->isStream = true;
+  splitInfo->leafType = SplitInfo::LeafType::SPLIT_AWARE_STREAM;
   splitInfoMap_[tableScanNode->id()] = splitInfo;
 
   return tableScanNode;
@@ -1360,7 +1360,7 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::constructCudfValueStreamNode(
   auto node = std::make_shared<CudfValueStreamNode>(nextPlanNodeId(), 
outputType, std::move(iterator));
 
   auto splitInfo = std::make_shared<SplitInfo>();
-  splitInfo->isStream = true;
+  splitInfo->leafType = SplitInfo::LeafType::TRIVIAL_LEAF;
   splitInfoMap_[node->id()] = splitInfo;
   return node;
 }
@@ -1381,7 +1381,7 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::constructValuesNode(
   }
   auto node = 
std::make_shared<facebook::velox::core::ValuesNode>(nextPlanNodeId(), 
std::move(rowVectors));
   auto splitInfo = std::make_shared<SplitInfo>();
-  splitInfo->isStream = true;
+  splitInfo->leafType = SplitInfo::LeafType::TRIVIAL_LEAF;
   splitInfoMap_[node->id()] = splitInfo;
   return node;
 }
@@ -1412,6 +1412,7 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
 
   // Otherwise, will create TableScan node for ReadRel.
   auto splitInfo = std::make_shared<SplitInfo>();
+  splitInfo->leafType = SplitInfo::LeafType::TABLE_SCAN;
   if (!validationMode_) {
     VELOX_CHECK_LT(splitInfoIdx_, splitInfos_.size(), "Plan must have readRel 
and related split info.");
     splitInfo = splitInfos_[splitInfoIdx_++];
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h 
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 0e00764a66..47bf3a0525 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -28,8 +28,17 @@ namespace gluten {
 class ResultIterator;
 
 struct SplitInfo {
-  /// Whether the split comes from arrow array stream node.
-  bool isStream = false;
+  enum class LeafType {
+    /// A streaming node that accepts iterator splits.
+    SPLIT_AWARE_STREAM = 0,
+    /// A table scan node that accepts scan splits.
+    TABLE_SCAN = 1,
+    /// A leaf node that doesn't rely on splits.
+    TRIVIAL_LEAF = 2
+  };
+
+  /// The type of the associated Velox leaf query plan node.
+  LeafType leafType = LeafType::TRIVIAL_LEAF;
 
   /// The Partition index.
   u_int32_t partitionIndex;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to