This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new df7d62c256 [GLUTEN-8851][VL] GPU validate the plan and use runtime
config to enable it (#9634)
df7d62c256 is described below
commit df7d62c25613f1be936a91358dd303f8b95d5ed5
Author: Jin Chengcheng <[email protected]>
AuthorDate: Mon Jun 30 17:02:31 2025 +0100
[GLUTEN-8851][VL] GPU validate the plan and use runtime config to enable it
(#9634)
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 6 +-
.../cudf/VeloxCudfPlanValidatorJniWrapper.java | 22 ++++++
.../backendsapi/velox/VeloxIteratorApi.scala | 12 ++--
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 3 +
.../gluten/extension/CudfNodeValidationRule.scala | 49 +++++++++++++
cpp/core/compute/Runtime.h | 1 -
cpp/core/config/GlutenConfig.h | 8 +++
cpp/core/jni/JniWrapper.cc | 10 ++-
cpp/velox/CMakeLists.txt | 3 +
cpp/velox/benchmarks/GenericBenchmark.cc | 5 +-
cpp/velox/compute/WholeStageResultIterator.cc | 11 ++-
cpp/velox/config/VeloxConfig.h | 8 ---
cpp/velox/cudf/CudfPlanValidator.cc | 81 ++++++++++++++++++++++
cpp/velox/cudf/CudfPlanValidator.h | 29 ++++++++
cpp/velox/jni/VeloxJniWrapper.cc | 20 ++++++
cpp/velox/operators/plannodes/RowVectorStream.h | 3 +-
cpp/velox/tests/RuntimeTest.cc | 2 +-
.../gluten/vectorized/NativePlanEvaluator.java | 6 +-
.../gluten/vectorized/PlanEvaluatorJniWrapper.java | 3 +-
.../apache/gluten/backendsapi/IteratorApi.scala | 6 +-
.../org/apache/gluten/config/GlutenConfig.scala | 2 +
.../execution/GlutenWholeStageColumnarRDD.scala | 6 +-
.../gluten/execution/WholeStageTransformer.scala | 25 ++++++-
.../execution/WholeStageZippedPartitionsRDD.scala | 3 +-
24 files changed, 291 insertions(+), 33 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index edf354b0c3..a31384a8b2 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -281,7 +281,8 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
- inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
+ inputIterators: Seq[Iterator[ColumnarBatch]] = Seq(),
+ enableCudf: Boolean = false
): Iterator[ColumnarBatch] = {
require(
@@ -321,7 +322,8 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
- materializeInput: Boolean): Iterator[ColumnarBatch] = {
+ materializeInput: Boolean,
+ enableCudf: Boolean): Iterator[ColumnarBatch] = {
// scalastyle:on argcount
// Final iterator does not contain scan split, so pass empty split info to
native here.
diff --git
a/backends-velox/src/main/java/org/apache/gluten/cudf/VeloxCudfPlanValidatorJniWrapper.java
b/backends-velox/src/main/java/org/apache/gluten/cudf/VeloxCudfPlanValidatorJniWrapper.java
new file mode 100644
index 0000000000..8ea0dad2d6
--- /dev/null
+++
b/backends-velox/src/main/java/org/apache/gluten/cudf/VeloxCudfPlanValidatorJniWrapper.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.cudf;
+
+/** The jni file is at `cpp/core/jni/VeloxJniWrapper.cc` */
+public class VeloxCudfPlanValidatorJniWrapper {
+ public static native boolean validate(byte[] wsPlan);
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 726c694e7f..c8aa07728f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -228,7 +228,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
- inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()):
Iterator[ColumnarBatch] = {
+ inputIterators: Seq[Iterator[ColumnarBatch]] = Seq(),
+ enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
assert(
inputPartition.isInstanceOf[GlutenPartition],
"Velox backend only accept GlutenPartition.")
@@ -253,7 +254,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
splitInfoByteArray,
columnarNativeIterators,
partitionIndex,
-
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
+
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
+ enableCudf
)
val itrMetrics = IteratorMetricsJniWrapper.create()
@@ -283,7 +285,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
- materializeInput: Boolean): Iterator[ColumnarBatch] = {
+ materializeInput: Boolean,
+ enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
ExecutorManager.tryTaskSet(numaBindingInfo)
@@ -304,7 +307,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
new Array[Array[Byte]](0),
columnarNativeIterator,
partitionIndex,
-
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
+
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
+ enableCudf
)
val itrMetrics = IteratorMetricsJniWrapper.create()
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index c750db1b9a..87d61f41b0 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -118,6 +118,8 @@ object VeloxRuleApi {
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
injector.injectPost(c => ColumnarCollapseTransformStages(new
GlutenConfig(c.sqlConf)))
+ injector.injectPost(c => CudfNodeValidationRule(new
GlutenConfig(c.sqlConf)))
+
injector.injectPost(c => GlutenNoopWriterRule(c.session))
// Gluten columnar: Final rules.
@@ -206,6 +208,7 @@ object VeloxRuleApi {
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPostTransform(c => each(c.session)))
injector.injectPostTransform(c => ColumnarCollapseTransformStages(new
GlutenConfig(c.sqlConf)))
+ injector.injectPostTransform(c => CudfNodeValidationRule(new
GlutenConfig(c.sqlConf)))
injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
injector.injectPostTransform(c =>
RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectPostTransform(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
new file mode 100644
index 0000000000..aeb21dd049
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.cudf.VeloxCudfPlanValidatorJniWrapper
+import org.apache.gluten.execution.{CudfTag, LeafTransformSupport,
TransformSupport, WholeStageTransformer}
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+
+// Add the node name prefix 'Cudf' to GlutenPlan when can offload to cudf
+case class CudfNodeValidationRule(glutenConf: GlutenConfig) extends
Rule[SparkPlan] {
+
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (!glutenConf.enableColumnarCudf) {
+ return plan
+ }
+ plan.transformUp {
+ case transformer: WholeStageTransformer =>
+ if (
+ VeloxCudfPlanValidatorJniWrapper.validate(
+ transformer.substraitPlan.toProtobuf.toByteArray)
+ ) {
+ transformer.foreach {
+ case _: LeafTransformSupport =>
+ case t: TransformSupport =>
+ t.setTagValue(CudfTag.CudfTag, true)
+ case _ =>
+ }
+ }
+ transformer
+ }
+ }
+}
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index c9962642ea..00498a9e54 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -94,7 +94,6 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
throw GlutenException("Not implemented");
}
- // Just for benchmark
::substrait::Plan& getPlan() {
return substraitPlan_;
}
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 578d8f66a5..5fb9040e32 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -82,6 +82,14 @@ const std::string kSparkMapKeyDedupPolicy =
"spark.sql.mapKeyDedupPolicy";
const std::string kSparkLegacyStatisticalAggregate =
"spark.sql.legacy.statisticalAggregate";
+// cudf
+#ifdef GLUTEN_ENABLE_GPU
+const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf";
+const bool kCudfEnabledDefault = "false";
+const std::string kDebugCudf = "spark.gluten.sql.debug.cudf";
+const bool kDebugCudfDefault = "false";
+#endif
+
std::unordered_map<std::string, std::string>
parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t
planDataLength);
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 1d2bddfcff..0419436ef1 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -400,11 +400,17 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
jint partitionId,
jlong taskId,
jboolean enableDumping,
- jstring spillDir) {
+ jstring spillDir,
+ jboolean enableCudf) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
- auto& conf = ctx->getConfMap();
+ auto conf = ctx->getConfMap();
+#ifdef GLUTEN_ENABLE_GPU
+ if (enableCudf) {
+ conf[kCudfEnabled] = "true";
+ }
+#endif
ctx->setSparkTaskInfo({stageId, partitionId, taskId});
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 213a33dc64..928d2c875b 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -177,6 +177,9 @@ if(ENABLE_S3)
find_package(ZLIB)
endif()
+if(ENABLE_GPU)
+ list(APPEND VELOX_SRCS cudf/CudfPlanValidator.cc)
+endif()
add_library(velox SHARED ${VELOX_SRCS})
if(ENABLE_GLUTEN_VCPKG AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index ad591c91fb..7e032e6c6d 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -444,8 +444,9 @@ auto BM_Generic = [](::benchmark::State& state,
std::vector<FileReaderIterator*> inputItersRaw;
if (!dataFiles.empty()) {
for (const auto& input : dataFiles) {
-
inputIters.push_back(FileReaderIterator::getInputIteratorFromFileReader(
- readerType, input, FLAGS_batch_size,
runtime->memoryManager()->getLeafMemoryPool()));
+ inputIters.push_back(
+ FileReaderIterator::getInputIteratorFromFileReader(
+ readerType, input, FLAGS_batch_size,
runtime->memoryManager()->getLeafMemoryPool()));
}
std::transform(
inputIters.begin(),
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index c60a81d4c9..9cd4233bd5 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -21,6 +21,9 @@
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/PlanNodeStats.h"
+#ifdef GLUTEN_ENABLE_GPU
+#include "velox/experimental/cudf/exec/ToCudf.h"
+#endif
using namespace facebook;
@@ -75,7 +78,6 @@ WholeStageResultIterator::WholeStageResultIterator(
if (spillThreadNum > 0) {
spillExecutor_ =
std::make_shared<folly::CPUThreadPoolExecutor>(spillThreadNum);
}
-
getOrderedNodeIds(veloxPlan_, orderedNodeIds_);
// Create task instance.
@@ -571,6 +573,13 @@ std::unordered_map<std::string, std::string>
WholeStageResultIterator::getQueryC
configs[velox::core::QueryConfig::kSparkLegacyStatisticalAggregate] =
std::to_string(veloxCfg_->get<bool>(kSparkLegacyStatisticalAggregate,
false));
+#ifdef GLUTEN_ENABLE_GPU
+ if (veloxCfg_->get<bool>(kCudfEnabled, false)) {
+ // TODO: wait for PR
https://github.com/facebookincubator/velox/pull/13341
+ // configs[cudf_velox::kCudfEnabled] = "false";
+ }
+#endif
+
const auto setIfExists = [&](const std::string& glutenKey, const
std::string& veloxKey) {
const auto valueOptional = veloxCfg_->get<std::string>(glutenKey);
if (valueOptional.hasValue()) {
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index e837314199..38752a8c14 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -146,14 +146,6 @@ const uint32_t kGlogVerboseLevelMaximum = 99;
const std::string kGlogSeverityLevel =
"spark.gluten.sql.columnar.backend.velox.glogSeverityLevel";
const uint32_t kGlogSeverityLevelDefault = 1;
-// cudf
-#ifdef GLUTEN_ENABLE_GPU
-const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf";
-const bool kCudfEnabledDefault = "false";
-const std::string kDebugCudf = "spark.gluten.sql.debug.cudf";
-const bool kDebugCudfDefault = "false";
-#endif
-
// Query trace
/// Enable query tracing flag.
const std::string kQueryTraceEnabled =
"spark.gluten.sql.columnar.backend.velox.queryTraceEnabled";
diff --git a/cpp/velox/cudf/CudfPlanValidator.cc
b/cpp/velox/cudf/CudfPlanValidator.cc
new file mode 100644
index 0000000000..177613e8cc
--- /dev/null
+++ b/cpp/velox/cudf/CudfPlanValidator.cc
@@ -0,0 +1,81 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CudfPlanValidator.h"
+#include "compute/ResultIterator.h"
+#include "compute/VeloxBackend.h"
+#include "compute/VeloxPlanConverter.h"
+#include "operators/plannodes/RowVectorStream.h"
+#include "velox/core/PlanNode.h"
+#include "velox/exec/Task.h"
+#include "velox/experimental/cudf/exec/ToCudf.h"
+
+using namespace facebook;
+
+namespace gluten {
+bool CudfPlanValidator::validate(const ::substrait::Plan& substraitPlan) {
+ auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool();
+ std::vector<::substrait::ReadRel_LocalFiles> localFiles;
+ std::unordered_map<std::string, std::string> configValues;
+ std::vector<std::shared_ptr<ResultIterator>> inputs;
+ std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg =
+
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>());
+ VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(),
veloxCfg.get(), std::nullopt, true);
+ auto planNode = veloxPlanConverter.toVeloxPlan(substraitPlan, localFiles);
+ std::unordered_set<velox::core::PlanNodeId> emptySet;
+ velox::core::PlanFragment planFragment{planNode,
velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
+
+ std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>>
connectorConfigs;
+ static std::atomic<uint32_t> vtId{0};
+ std::shared_ptr<velox::core::QueryCtx> queryCtx =
velox::core::QueryCtx::create(
+ nullptr,
+ facebook::velox::core::QueryConfig{configValues},
+ connectorConfigs,
+ gluten::VeloxBackend::get()->getAsyncDataCache(),
+ getDefaultMemoryManager()->getAggregateMemoryPool(),
+ nullptr,
+ fmt::format("Gluten_Cudf_Validation_VTID_{}", std::to_string(vtId++)));
+ std::shared_ptr<facebook::velox::exec::Task> task =
velox::exec::Task::create(
+ fmt::format("Gluten_Cudf_Validation_VTID_{}", std::to_string(vtId++)),
+ std::move(planFragment),
+ 0,
+ std::move(queryCtx),
+ velox::exec::Task::ExecutionMode::kSerial);
+ std::vector<velox::exec::Operator*> operators;
+ task->testingVisitDrivers([&](velox::exec::Driver* driver) { operators =
driver->operators(); });
+ for (const auto* op : operators) {
+ if (dynamic_cast<const exec::TableScan*>(op) != nullptr) {
+ continue;
+ }
+ // TODO: wait for PR https://github.com/facebookincubator/velox/pull/13341
+ // if (cudf_velox::isCudfOperator(op)) {
+ // continue;
+ // }
+ if (dynamic_cast<const ValueStream*>(op) != nullptr) {
+ continue;
+ }
+ LOG(INFO) << "Operator " << op->operatorType() << " is not supported in
cudf";
+ task->requestCancel().wait();
+ return false;
+ }
+ task->requestCancel().wait();
+ LOG(INFO) << "Cudf Operator validation success";
+ return true;
+}
+
+} // namespace gluten
diff --git a/cpp/velox/cudf/CudfPlanValidator.h
b/cpp/velox/cudf/CudfPlanValidator.h
new file mode 100644
index 0000000000..de4fb150e0
--- /dev/null
+++ b/cpp/velox/cudf/CudfPlanValidator.h
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "memory/VeloxMemoryManager.h"
+#include "substrait/plan.pb.h"
+
+namespace gluten {
+class CudfPlanValidator {
+ public:
+ // Validate if the plan contains cudf unsupported operator except TableScan
and ValueStream.
+ static bool validate(const ::substrait::Plan& substraitPlan);
+};
+} // namespace gluten
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index fdadf13ba8..c372f78d15 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -37,6 +37,10 @@
#include "velox/common/base/BloomFilter.h"
#include "velox/common/file/FileSystems.h"
+#ifdef GLUTEN_ENABLE_GPU
+#include "cudf/CudfPlanValidator.h"
+#endif
+
using namespace gluten;
using namespace facebook;
@@ -590,6 +594,22 @@ JNIEXPORT jboolean JNICALL
Java_org_apache_gluten_config_ConfigJniWrapper_isEnha
#endif
}
+#ifdef GLUTEN_ENABLE_GPU
+JNIEXPORT jboolean JNICALL
Java_org_apache_gluten_cudf_VeloxCudfPlanValidatorJniWrapper_validate( // NOLINT
+ JNIEnv* env,
+ jclass,
+ jbyteArray planArr) {
+ JNI_METHOD_START
+ auto safePlanArray = getByteArrayElementsSafe(env, planArr);
+ auto planSize = env->GetArrayLength(planArr);
+ ::substrait::Plan substraitPlan;
+ parseProtobuf(safePlanArray.elems(), planSize, &substraitPlan);
+ // get the task and driver, validate the plan, if return all operator except
table scan is offloaded, validate true.
+ return CudfPlanValidator::validate(substraitPlan);
+ JNI_METHOD_END(false)
+}
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h
b/cpp/velox/operators/plannodes/RowVectorStream.h
index d503eb2d14..0c56112450 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -61,6 +61,8 @@ class RowVectorStream {
if (finished_) {
return false;
}
+ VELOX_DCHECK_NOT_NULL(iterator_);
+
bool hasNext;
{
// We are leaving Velox task execution and are probably entering Spark
code through JNI. Suspend the current
@@ -157,7 +159,6 @@ class ValueStream : public
facebook::velox::exec::SourceOperator {
valueStreamNode->id(),
valueStreamNode->name().data()) {
ResultIterator* itr = valueStreamNode->iterator();
- VELOX_CHECK_NOT_NULL(itr);
rvStream_ = std::make_unique<RowVectorStream>(driverCtx, pool(), itr,
outputType_);
}
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index ec9aee855b..2241ddcdaa 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -25,7 +25,7 @@ namespace gluten {
class DummyMemoryManager final : public MemoryManager {
public:
- DummyMemoryManager(const std::string& kind) : MemoryManager(kind){};
+ DummyMemoryManager(const std::string& kind) : MemoryManager(kind) {};
arrow::MemoryPool* defaultArrowMemoryPool() override {
throw GlutenException("Not yet implemented");
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index 7fefb00c22..96643dd711 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -62,7 +62,8 @@ public class NativePlanEvaluator {
byte[][] splitInfo,
List<ColumnarBatchInIterator> iterList,
int partitionIndex,
- String spillDirPath)
+ String spillDirPath,
+ boolean enableCudf)
throws RuntimeException {
final long itrHandle =
jniWrapper.nativeCreateKernelWithIterator(
@@ -73,7 +74,8 @@ public class NativePlanEvaluator {
partitionIndex, // TaskContext.getPartitionId(),
TaskContext.get().taskAttemptId(),
DebugUtil.isDumpingEnabledForTask(),
- spillDirPath);
+ spillDirPath,
+ enableCudf);
final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle);
runtime
.memoryManager()
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
index ebd23d6f64..39a543422d 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
@@ -66,6 +66,7 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware {
int partitionId,
long taskId,
boolean enableDumping,
- String spillDir)
+ String spillDir,
+ boolean enableCudf)
throws RuntimeException;
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index efb793839f..637cc43ae7 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -75,7 +75,8 @@ trait IteratorApi {
updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
- inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
+ inputIterators: Seq[Iterator[ColumnarBatch]] = Seq(),
+ enableCudf: Boolean = false
): Iterator[ColumnarBatch]
/**
@@ -92,6 +93,7 @@ trait IteratorApi {
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
- materializeInput: Boolean = false): Iterator[ColumnarBatch]
+ materializeInput: Boolean = false,
+ enableCudf: Boolean = false): Iterator[ColumnarBatch]
// scalastyle:on argcount
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index f562d1c127..839708d810 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -106,6 +106,8 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
def enableCountDistinctWithoutExpand: Boolean =
getConf(ENABLE_COUNT_DISTINCT_WITHOUT_EXPAND)
+ def enableColumnarCudf: Boolean = getConf(COLUMNAR_CUDF_ENABLED)
+
def enableExtendedColumnPruning: Boolean =
getConf(ENABLE_EXTENDED_COLUMN_PRUNING)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index 011c34865b..d8de26c15d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -56,7 +56,8 @@ class GlutenWholeStageColumnarRDD(
var rdds: ColumnarInputRDDsWrapper,
pipelineTime: SQLMetric,
updateInputMetrics: InputMetricsWrapper => Unit,
- updateNativeMetrics: IMetrics => Unit)
+ updateNativeMetrics: IMetrics => Unit,
+ enableCudf: Boolean = false)
extends RDD[ColumnarBatch](sc, rdds.getDependencies) {
private val numaBindingInfo = GlutenConfig.get.numaBindingInfo
@@ -73,7 +74,8 @@ class GlutenWholeStageColumnarRDD(
updateInputMetrics,
updateNativeMetrics,
split.index,
- inputIterators
+ inputIterators,
+ enableCudf
)
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 15655f79c8..3bdd6eba44 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -34,6 +34,7 @@ import org.apache.spark.softaffinity.SoftAffinity
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -49,10 +50,14 @@ import scala.collection.mutable.ArrayBuffer
case class TransformContext(outputAttributes: Seq[Attribute], root: RelNode)
-case class WholeStageTransformContext(root: PlanNode, substraitContext:
SubstraitContext = null)
+case class WholeStageTransformContext(
+ root: PlanNode,
+ substraitContext: SubstraitContext = null,
+ enableCudf: Boolean = false)
/** Base interface for a query plan that can be interpreted to Substrait
representation. */
trait TransformSupport extends ValidatablePlan {
+
override def batchType(): Convention.BatchType = {
BackendsApiManager.getSettings.primaryBatchType
}
@@ -66,6 +71,15 @@ trait TransformSupport extends ValidatablePlan {
s"${this.getClass.getSimpleName} doesn't support doExecute")
}
+ protected def isCudf: Boolean =
getTagValue[Boolean](CudfTag.CudfTag).getOrElse(false)
+
+ // Use super.nodeName will cause exception scala 213 Super calls can only
target methods
+ // for FileSourceScan.
+ override def nodeName: String =
+ if (isCudf) {
+ "Cudf" + getClass.getSimpleName.replaceAll("Exec$", "")
+ } else getClass.getSimpleName
+
/**
* Returns all the RDDs of ColumnarBatch which generates the input rows.
*
@@ -245,7 +259,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
PlanBuilder.makePlan(substraitContext,
Lists.newArrayList(childCtx.root), outNames)
}
- WholeStageTransformContext(planNode, substraitContext)
+ WholeStageTransformContext(planNode, substraitContext, isCudf)
}
def doWholeStageTransform(): WholeStageTransformContext = {
@@ -346,7 +360,8 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
wsCtx.substraitContext.registeredRelMap,
wsCtx.substraitContext.registeredJoinParams,
wsCtx.substraitContext.registeredAggregationParams
- )
+ ),
+ wsCtx.enableCudf
)
SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
@@ -534,3 +549,7 @@ class ColumnarInputRDDsWrapper(columnarInputRDDs:
Seq[RDD[ColumnarBatch]]) exten
}
}
}
+
+object CudfTag {
+ val CudfTag = TreeNodeTag[Boolean]("org.apache.gluten.CudfTag")
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
index 1bb19cfd8b..2c91057509 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
@@ -56,7 +56,8 @@ class WholeStageZippedPartitionsRDD(
pipelineTime,
updateNativeMetrics,
split.index,
- materializeInput
+ materializeInput,
+ resCtx.enableCudf
)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]