This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c5c43c1a6f [GLUTEN-11569][VL] Following #11527, fix the error calling
#noMoreSplits on cuDF value stream nodes (#11572)
c5c43c1a6f is described below
commit c5c43c1a6fc8186c6ffaeb46c1e94fd7bbb8d076
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Feb 9 14:29:07 2026 +0000
[GLUTEN-11569][VL] Following #11527, fix the error calling #noMoreSplits on
cuDF value stream nodes (#11572)
---
cpp/velox/compute/VeloxPlanConverter.cc | 1 +
cpp/velox/compute/VeloxRuntime.cc | 11 ++++++++---
cpp/velox/compute/WholeStageResultIterator.cc | 6 +++---
cpp/velox/compute/WholeStageResultIterator.h | 2 +-
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 7 ++++---
cpp/velox/substrait/SubstraitToVeloxPlan.h | 13 +++++++++++--
6 files changed, 28 insertions(+), 12 deletions(-)
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc
b/cpp/velox/compute/VeloxPlanConverter.cc
index 4764fad382..627bd396b7 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -46,6 +46,7 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
using SubstraitFileFormatCase =
::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase;
auto splitInfo = std::make_shared<SplitInfo>();
+ splitInfo->leafType = SplitInfo::LeafType::TABLE_SCAN;
splitInfo->paths.reserve(fileList.size());
splitInfo->starts.reserve(fileList.size());
splitInfo->lengths.reserve(fileList.size());
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index 69d21b4a57..e88cb43a91 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -137,11 +137,16 @@ void VeloxRuntime::getInfoAndIds(
// 1. Streams follow "iterator:<idx>" in the substrait plan;
// 2. Files follow the traversal order in the plan node tree.
// FIXME: Why we didn't have a unified design?
- if (splitInfo->isStream) {
+ switch (splitInfo->leafType) {
+ case SplitInfo::LeafType::SPLIT_AWARE_STREAM:
streamIds.emplace_back(ValueStreamConnectorFactory::nodeIdOf(streamIdx++));
- } else {
- scanInfos.emplace_back(splitInfo);
+break;
+ case SplitInfo::LeafType::TABLE_SCAN:
+ scanInfos.emplace_back(splitInfo);
scanIds.emplace_back(leafPlanNodeId);
+break;
+ case SplitInfo::LeafType::TRIVIAL_LEAF:
+break;
}
}
}
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 9849075e1f..185dc5fd5e 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -359,7 +359,7 @@ void WholeStageResultIterator::constructPartitionColumns(
}
void WholeStageResultIterator::addIteratorSplits(const
std::vector<std::shared_ptr<ResultIterator>>& inputIterators) {
- GLUTEN_CHECK(!allSplitsAdded, "Method addIteratorSplits should not be called
since all splits has been added to the Velox task.");
+ GLUTEN_CHECK(!allSplitsAdded_, "Method addIteratorSplits should not be
called since all splits has been added to the Velox task.");
// Create IteratorConnectorSplit for each iterator
for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) {
if (inputIterators[i] == nullptr) {
@@ -373,7 +373,7 @@ void WholeStageResultIterator::addIteratorSplits(const
std::vector<std::shared_p
}
void WholeStageResultIterator::noMoreSplits() {
- if (allSplitsAdded) {
+ if (allSplitsAdded_) {
return;
}
// Mark no more splits for all scan nodes
@@ -391,7 +391,7 @@ void WholeStageResultIterator::noMoreSplits() {
for (const auto& streamId : streamIds_) {
task_->noMoreSplits(streamId);
}
- allSplitsAdded = true;
+ allSplitsAdded_ = true;
}
void WholeStageResultIterator::collectMetrics() {
diff --git a/cpp/velox/compute/WholeStageResultIterator.h
b/cpp/velox/compute/WholeStageResultIterator.h
index 401cff06da..85cb18fd25 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -139,7 +139,7 @@ class WholeStageResultIterator : public
SplitAwareColumnarBatchIterator {
std::vector<std::shared_ptr<SplitInfo>> scanInfos_;
std::vector<facebook::velox::core::PlanNodeId> streamIds_;
std::vector<std::vector<facebook::velox::exec::Split>> splits_;
- bool allSplitsAdded = false;
+ bool allSplitsAdded_ = false;
int64_t loadLazyVectorTime_ = 0;
};
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 0085c553ba..b543dfa8ba 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -1323,7 +1323,7 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::constructValueStreamNode(
// Mark this as a stream-based split
auto splitInfo = std::make_shared<SplitInfo>();
- splitInfo->isStream = true;
+ splitInfo->leafType = SplitInfo::LeafType::SPLIT_AWARE_STREAM;
splitInfoMap_[tableScanNode->id()] = splitInfo;
return tableScanNode;
@@ -1360,7 +1360,7 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::constructCudfValueStreamNode(
auto node = std::make_shared<CudfValueStreamNode>(nextPlanNodeId(),
outputType, std::move(iterator));
auto splitInfo = std::make_shared<SplitInfo>();
- splitInfo->isStream = true;
+ splitInfo->leafType = SplitInfo::LeafType::TRIVIAL_LEAF;
splitInfoMap_[node->id()] = splitInfo;
return node;
}
@@ -1381,7 +1381,7 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::constructValuesNode(
}
auto node =
std::make_shared<facebook::velox::core::ValuesNode>(nextPlanNodeId(),
std::move(rowVectors));
auto splitInfo = std::make_shared<SplitInfo>();
- splitInfo->isStream = true;
+ splitInfo->leafType = SplitInfo::LeafType::TRIVIAL_LEAF;
splitInfoMap_[node->id()] = splitInfo;
return node;
}
@@ -1412,6 +1412,7 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
// Otherwise, will create TableScan node for ReadRel.
auto splitInfo = std::make_shared<SplitInfo>();
+ splitInfo->leafType = SplitInfo::LeafType::TABLE_SCAN;
if (!validationMode_) {
VELOX_CHECK_LT(splitInfoIdx_, splitInfos_.size(), "Plan must have readRel
and related split info.");
splitInfo = splitInfos_[splitInfoIdx_++];
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 0e00764a66..47bf3a0525 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -28,8 +28,17 @@ namespace gluten {
class ResultIterator;
struct SplitInfo {
- /// Whether the split comes from arrow array stream node.
- bool isStream = false;
+ enum class LeafType {
+ /// A streaming node that accepts iterator splits.
+ SPLIT_AWARE_STREAM = 0,
+ /// A table scan node that accepts scan splits.
+ TABLE_SCAN = 1,
+ /// A leaf node that doesn't rely on splits.
+ TRIVIAL_LEAF = 2
+ };
+
+ /// The type of the associated Velox leaf query plan node.
+ LeafType leafType = LeafType::TRIVIAL_LEAF;
/// The Partition index.
u_int32_t partitionIndex;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]