This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c928c895ad [GLUTEN-7243][VL] Suspend the Velox task while reading an 
input Java iterator to make the task spillable (#7748)
c928c895ad is described below

commit c928c895ad866281a68ac3c366ebb2c774555d31
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Nov 5 15:54:59 2024 +0800

    [GLUTEN-7243][VL] Suspend the Velox task while reading an input Java 
iterator to make the task spillable (#7748)
---
 cpp/velox/compute/WholeStageResultIterator.cc      | 12 ----
 cpp/velox/operators/plannodes/RowVectorStream.h    | 70 +++++++++++++++-------
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        |  3 +-
 .../org/apache/gluten/extension/GlutenPlan.scala   |  6 +-
 4 files changed, 52 insertions(+), 39 deletions(-)

diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 29f467c5e7..adc9e9bbe9 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -226,18 +226,6 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t 
size) {
   std::string logPrefix{"Spill[" + poolName + "]: "};
   int64_t shrunken = memoryManager_->shrink(size);
   if (spillStrategy_ == "auto") {
-    if (task_->numThreads() != 0) {
-      // Task should have zero running threads, otherwise there's
-      // possibility that this spill call hangs. See 
https://github.com/apache/incubator-gluten/issues/7243.
-      // As of now, non-zero running threads usually happens when:
-      // 1. Task A spills task B;
-      // 2. Task A trys to grow buffers created by task B, during which spill 
is requested on task A again;
-      LOG(INFO) << fmt::format(
-          "{} spill is requested on a task {} that has non-zero running 
threads, which is not currently supported. Skipping.",
-          logPrefix,
-          task_->taskId());
-      return shrunken;
-    }
     int64_t remaining = size - shrunken;
     LOG(INFO) << fmt::format("{} trying to request spill for {}.", logPrefix, 
velox::succinctBytes(remaining));
     auto mm = memoryManager_->getMemoryManager();
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h 
b/cpp/velox/operators/plannodes/RowVectorStream.h
index c72e9137f4..ce26305fa0 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -26,16 +26,33 @@ namespace gluten {
 class RowVectorStream {
  public:
   explicit RowVectorStream(
+      facebook::velox::exec::DriverCtx* driverCtx,
       facebook::velox::memory::MemoryPool* pool,
-      std::shared_ptr<ResultIterator> iterator,
+      ResultIterator* iterator,
       const facebook::velox::RowTypePtr& outputType)
-      : iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {}
+      : driverCtx_(driverCtx), pool_(pool), outputType_(outputType), 
iterator_(iterator) {}
 
   bool hasNext() {
-    if (!finished_) {
-      finished_ = !iterator_->hasNext();
+    if (finished_) {
+      return false;
     }
-    return !finished_;
+    bool hasNext;
+    {
+      // We are leaving Velox task execution and are probably entering Spark 
code through JNI. Suspend the current
+      // driver to make the current task open to spilling.
+      //
+      // When a task is getting spilled, it should have been suspended so has 
zero running threads, otherwise there's
+      // possibility that this spill call hangs. See 
https://github.com/apache/incubator-gluten/issues/7243.
+      // As of now, non-zero running threads usually happens when:
+      // 1. Task A spills task B;
+      // 2. Task A trys to grow buffers created by task B, during which spill 
is requested on task A again.
+      facebook::velox::exec::SuspendedSection(driverCtx_->driver);
+      hasNext = iterator_->hasNext();
+    }
+    if (!hasNext) {
+      finished_ = true;
+    }
+    return hasNext;
   }
 
   // Convert arrow batch to rowvector and use new output columns
@@ -43,7 +60,14 @@ class RowVectorStream {
     if (finished_) {
       return nullptr;
     }
-    const std::shared_ptr<VeloxColumnarBatch>& vb = 
VeloxColumnarBatch::from(pool_, iterator_->next());
+    std::shared_ptr<ColumnarBatch> cb;
+    {
+      // We are leaving Velox task execution and are probably entering Spark 
code through JNI. Suspend the current
+      // driver to make the current task open to spilling.
+      facebook::velox::exec::SuspendedSection(driverCtx_->driver);
+      cb = iterator_->next();
+    }
+    const std::shared_ptr<VeloxColumnarBatch>& vb = 
VeloxColumnarBatch::from(pool_, cb);
     auto vp = vb->getRowVector();
     VELOX_DCHECK(vp != nullptr);
     return std::make_shared<facebook::velox::RowVector>(
@@ -51,10 +75,12 @@ class RowVectorStream {
   }
 
  private:
-  bool finished_{false};
-  std::shared_ptr<ResultIterator> iterator_;
-  const facebook::velox::RowTypePtr outputType_;
+  facebook::velox::exec::DriverCtx* driverCtx_;
   facebook::velox::memory::MemoryPool* pool_;
+  const facebook::velox::RowTypePtr outputType_;
+  ResultIterator* iterator_;
+
+  bool finished_{false};
 };
 
 class ValueStreamNode final : public facebook::velox::core::PlanNode {
@@ -62,21 +88,19 @@ class ValueStreamNode final : public 
facebook::velox::core::PlanNode {
   ValueStreamNode(
       const facebook::velox::core::PlanNodeId& id,
       const facebook::velox::RowTypePtr& outputType,
-      std::unique_ptr<RowVectorStream> valueStream)
-      : facebook::velox::core::PlanNode(id), outputType_(outputType), 
valueStream_(std::move(valueStream)) {
-    VELOX_CHECK_NOT_NULL(valueStream_);
-  }
+      std::shared_ptr<ResultIterator> iterator)
+      : facebook::velox::core::PlanNode(id), outputType_(outputType), 
iterator_(std::move(iterator)) {}
 
   const facebook::velox::RowTypePtr& outputType() const override {
     return outputType_;
   }
 
   const std::vector<facebook::velox::core::PlanNodePtr>& sources() const 
override {
-    return kEmptySources;
+    return kEmptySources_;
   };
 
-  RowVectorStream* rowVectorStream() const {
-    return valueStream_.get();
+  ResultIterator* iterator() const {
+    return iterator_.get();
   }
 
   std::string_view name() const override {
@@ -91,8 +115,8 @@ class ValueStreamNode final : public 
facebook::velox::core::PlanNode {
   void addDetails(std::stringstream& stream) const override{};
 
   const facebook::velox::RowTypePtr outputType_;
-  std::unique_ptr<RowVectorStream> valueStream_;
-  const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources;
+  std::shared_ptr<ResultIterator> iterator_;
+  const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
 };
 
 class ValueStream : public facebook::velox::exec::SourceOperator {
@@ -107,15 +131,17 @@ class ValueStream : public 
facebook::velox::exec::SourceOperator {
             operatorId,
             valueStreamNode->id(),
             valueStreamNode->name().data()) {
-    valueStream_ = valueStreamNode->rowVectorStream();
+    ResultIterator* itr = valueStreamNode->iterator();
+    VELOX_CHECK_NOT_NULL(itr);
+    rvStream_ = std::make_unique<RowVectorStream>(driverCtx, pool(), itr, 
outputType_);
   }
 
   facebook::velox::RowVectorPtr getOutput() override {
     if (finished_) {
       return nullptr;
     }
-    if (valueStream_->hasNext()) {
-      return valueStream_->next();
+    if (rvStream_->hasNext()) {
+      return rvStream_->next();
     } else {
       finished_ = true;
       return nullptr;
@@ -132,7 +158,7 @@ class ValueStream : public 
facebook::velox::exec::SourceOperator {
 
  private:
   bool finished_ = false;
-  RowVectorStream* valueStream_;
+  std::unique_ptr<RowVectorStream> rvStream_;
 };
 
 class RowVectorStreamOperatorTranslator : public 
facebook::velox::exec::Operator::PlanNodeTranslator {
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 01386115b3..9e29590433 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -1129,8 +1129,7 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::constructValueStreamNode(
     VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index 
{} in input iterator list.", streamIdx);
     iterator = inputIters_[streamIdx];
   }
-  auto valueStream = std::make_unique<RowVectorStream>(pool_, iterator, 
outputType);
-  auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, 
std::move(valueStream));
+  auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, 
std::move(iterator));
 
   auto splitInfo = std::make_shared<SplitInfo>();
   splitInfo->isStream = true;
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 856d208ead..06d798e50f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -62,16 +62,16 @@ trait GlutenPlan extends SparkPlan with 
Convention.KnownBatchType with LogLevelU
    * Validate whether this SparkPlan supports to be transformed into substrait 
node in Native Code.
    */
   final def doValidate(): ValidationResult = {
-    val schemaVaidationResult = BackendsApiManager.getValidatorApiInstance
+    val schemaValidationResult = BackendsApiManager.getValidatorApiInstance
       .doSchemaValidate(schema)
       .map {
         reason =>
           ValidationResult.failed(s"Found schema check failure for $schema, 
due to: $reason")
       }
       .getOrElse(ValidationResult.succeeded)
-    if (!schemaVaidationResult.ok()) {
+    if (!schemaValidationResult.ok()) {
       TestStats.addFallBackClassName(this.getClass.toString)
-      return schemaVaidationResult
+      return schemaValidationResult
     }
     try {
       TransformerState.enterValidation


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to