This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c928c895ad [GLUTEN-7243][VL] Suspend the Velox task while reading an
input Java iterator to make the task spillable (#7748)
c928c895ad is described below
commit c928c895ad866281a68ac3c366ebb2c774555d31
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Nov 5 15:54:59 2024 +0800
[GLUTEN-7243][VL] Suspend the Velox task while reading an input Java
iterator to make the task spillable (#7748)
---
cpp/velox/compute/WholeStageResultIterator.cc | 12 ----
cpp/velox/operators/plannodes/RowVectorStream.h | 70 +++++++++++++++-------
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 3 +-
.../org/apache/gluten/extension/GlutenPlan.scala | 6 +-
4 files changed, 52 insertions(+), 39 deletions(-)
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 29f467c5e7..adc9e9bbe9 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -226,18 +226,6 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t
size) {
std::string logPrefix{"Spill[" + poolName + "]: "};
int64_t shrunken = memoryManager_->shrink(size);
if (spillStrategy_ == "auto") {
- if (task_->numThreads() != 0) {
- // Task should have zero running threads, otherwise there's
- // possibility that this spill call hangs. See
https://github.com/apache/incubator-gluten/issues/7243.
- // As of now, non-zero running threads usually happens when:
- // 1. Task A spills task B;
- // 2. Task A trys to grow buffers created by task B, during which spill
is requested on task A again;
- LOG(INFO) << fmt::format(
- "{} spill is requested on a task {} that has non-zero running
threads, which is not currently supported. Skipping.",
- logPrefix,
- task_->taskId());
- return shrunken;
- }
int64_t remaining = size - shrunken;
LOG(INFO) << fmt::format("{} trying to request spill for {}.", logPrefix,
velox::succinctBytes(remaining));
auto mm = memoryManager_->getMemoryManager();
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h
b/cpp/velox/operators/plannodes/RowVectorStream.h
index c72e9137f4..ce26305fa0 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -26,16 +26,33 @@ namespace gluten {
class RowVectorStream {
public:
explicit RowVectorStream(
+ facebook::velox::exec::DriverCtx* driverCtx,
facebook::velox::memory::MemoryPool* pool,
- std::shared_ptr<ResultIterator> iterator,
+ ResultIterator* iterator,
const facebook::velox::RowTypePtr& outputType)
- : iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {}
+ : driverCtx_(driverCtx), pool_(pool), outputType_(outputType),
iterator_(iterator) {}
bool hasNext() {
- if (!finished_) {
- finished_ = !iterator_->hasNext();
+ if (finished_) {
+ return false;
}
- return !finished_;
+ bool hasNext;
+ {
+ // We are leaving Velox task execution and are probably entering Spark
code through JNI. Suspend the current
+ // driver to make the current task open to spilling.
+ //
+ // When a task is getting spilled, it should have been suspended so has
zero running threads, otherwise there's
+ // possibility that this spill call hangs. See
https://github.com/apache/incubator-gluten/issues/7243.
+ // As of now, non-zero running threads usually happens when:
+ // 1. Task A spills task B;
+ // 2. Task A trys to grow buffers created by task B, during which spill
is requested on task A again.
+ facebook::velox::exec::SuspendedSection(driverCtx_->driver);
+ hasNext = iterator_->hasNext();
+ }
+ if (!hasNext) {
+ finished_ = true;
+ }
+ return hasNext;
}
// Convert arrow batch to rowvector and use new output columns
@@ -43,7 +60,14 @@ class RowVectorStream {
if (finished_) {
return nullptr;
}
- const std::shared_ptr<VeloxColumnarBatch>& vb =
VeloxColumnarBatch::from(pool_, iterator_->next());
+ std::shared_ptr<ColumnarBatch> cb;
+ {
+ // We are leaving Velox task execution and are probably entering Spark
code through JNI. Suspend the current
+ // driver to make the current task open to spilling.
+ facebook::velox::exec::SuspendedSection(driverCtx_->driver);
+ cb = iterator_->next();
+ }
+ const std::shared_ptr<VeloxColumnarBatch>& vb =
VeloxColumnarBatch::from(pool_, cb);
auto vp = vb->getRowVector();
VELOX_DCHECK(vp != nullptr);
return std::make_shared<facebook::velox::RowVector>(
@@ -51,10 +75,12 @@ class RowVectorStream {
}
private:
- bool finished_{false};
- std::shared_ptr<ResultIterator> iterator_;
- const facebook::velox::RowTypePtr outputType_;
+ facebook::velox::exec::DriverCtx* driverCtx_;
facebook::velox::memory::MemoryPool* pool_;
+ const facebook::velox::RowTypePtr outputType_;
+ ResultIterator* iterator_;
+
+ bool finished_{false};
};
class ValueStreamNode final : public facebook::velox::core::PlanNode {
@@ -62,21 +88,19 @@ class ValueStreamNode final : public
facebook::velox::core::PlanNode {
ValueStreamNode(
const facebook::velox::core::PlanNodeId& id,
const facebook::velox::RowTypePtr& outputType,
- std::unique_ptr<RowVectorStream> valueStream)
- : facebook::velox::core::PlanNode(id), outputType_(outputType),
valueStream_(std::move(valueStream)) {
- VELOX_CHECK_NOT_NULL(valueStream_);
- }
+ std::shared_ptr<ResultIterator> iterator)
+ : facebook::velox::core::PlanNode(id), outputType_(outputType),
iterator_(std::move(iterator)) {}
const facebook::velox::RowTypePtr& outputType() const override {
return outputType_;
}
const std::vector<facebook::velox::core::PlanNodePtr>& sources() const
override {
- return kEmptySources;
+ return kEmptySources_;
};
- RowVectorStream* rowVectorStream() const {
- return valueStream_.get();
+ ResultIterator* iterator() const {
+ return iterator_.get();
}
std::string_view name() const override {
@@ -91,8 +115,8 @@ class ValueStreamNode final : public
facebook::velox::core::PlanNode {
void addDetails(std::stringstream& stream) const override{};
const facebook::velox::RowTypePtr outputType_;
- std::unique_ptr<RowVectorStream> valueStream_;
- const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources;
+ std::shared_ptr<ResultIterator> iterator_;
+ const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
};
class ValueStream : public facebook::velox::exec::SourceOperator {
@@ -107,15 +131,17 @@ class ValueStream : public
facebook::velox::exec::SourceOperator {
operatorId,
valueStreamNode->id(),
valueStreamNode->name().data()) {
- valueStream_ = valueStreamNode->rowVectorStream();
+ ResultIterator* itr = valueStreamNode->iterator();
+ VELOX_CHECK_NOT_NULL(itr);
+ rvStream_ = std::make_unique<RowVectorStream>(driverCtx, pool(), itr,
outputType_);
}
facebook::velox::RowVectorPtr getOutput() override {
if (finished_) {
return nullptr;
}
- if (valueStream_->hasNext()) {
- return valueStream_->next();
+ if (rvStream_->hasNext()) {
+ return rvStream_->next();
} else {
finished_ = true;
return nullptr;
@@ -132,7 +158,7 @@ class ValueStream : public
facebook::velox::exec::SourceOperator {
private:
bool finished_ = false;
- RowVectorStream* valueStream_;
+ std::unique_ptr<RowVectorStream> rvStream_;
};
class RowVectorStreamOperatorTranslator : public
facebook::velox::exec::Operator::PlanNodeTranslator {
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 01386115b3..9e29590433 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -1129,8 +1129,7 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::constructValueStreamNode(
VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index
{} in input iterator list.", streamIdx);
iterator = inputIters_[streamIdx];
}
- auto valueStream = std::make_unique<RowVectorStream>(pool_, iterator,
outputType);
- auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType,
std::move(valueStream));
+ auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType,
std::move(iterator));
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 856d208ead..06d798e50f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -62,16 +62,16 @@ trait GlutenPlan extends SparkPlan with
Convention.KnownBatchType with LogLevelU
* Validate whether this SparkPlan supports to be transformed into substrait
node in Native Code.
*/
final def doValidate(): ValidationResult = {
- val schemaVaidationResult = BackendsApiManager.getValidatorApiInstance
+ val schemaValidationResult = BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason =>
ValidationResult.failed(s"Found schema check failure for $schema,
due to: $reason")
}
.getOrElse(ValidationResult.succeeded)
- if (!schemaVaidationResult.ok()) {
+ if (!schemaValidationResult.ok()) {
TestStats.addFallBackClassName(this.getClass.toString)
- return schemaVaidationResult
+ return schemaValidationResult
}
try {
TransformerState.enterValidation
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]