This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new df7d62c256 [GLUTEN-8851][VL] GPU validate the plan and use runtime 
config to enable it (#9634)
df7d62c256 is described below

commit df7d62c25613f1be936a91358dd303f8b95d5ed5
Author: Jin Chengcheng <[email protected]>
AuthorDate: Mon Jun 30 17:02:31 2025 +0100

    [GLUTEN-8851][VL] GPU validate the plan and use runtime config to enable it 
(#9634)
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  6 +-
 .../cudf/VeloxCudfPlanValidatorJniWrapper.java     | 22 ++++++
 .../backendsapi/velox/VeloxIteratorApi.scala       | 12 ++--
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  3 +
 .../gluten/extension/CudfNodeValidationRule.scala  | 49 +++++++++++++
 cpp/core/compute/Runtime.h                         |  1 -
 cpp/core/config/GlutenConfig.h                     |  8 +++
 cpp/core/jni/JniWrapper.cc                         | 10 ++-
 cpp/velox/CMakeLists.txt                           |  3 +
 cpp/velox/benchmarks/GenericBenchmark.cc           |  5 +-
 cpp/velox/compute/WholeStageResultIterator.cc      | 11 ++-
 cpp/velox/config/VeloxConfig.h                     |  8 ---
 cpp/velox/cudf/CudfPlanValidator.cc                | 81 ++++++++++++++++++++++
 cpp/velox/cudf/CudfPlanValidator.h                 | 29 ++++++++
 cpp/velox/jni/VeloxJniWrapper.cc                   | 20 ++++++
 cpp/velox/operators/plannodes/RowVectorStream.h    |  3 +-
 cpp/velox/tests/RuntimeTest.cc                     |  2 +-
 .../gluten/vectorized/NativePlanEvaluator.java     |  6 +-
 .../gluten/vectorized/PlanEvaluatorJniWrapper.java |  3 +-
 .../apache/gluten/backendsapi/IteratorApi.scala    |  6 +-
 .../org/apache/gluten/config/GlutenConfig.scala    |  2 +
 .../execution/GlutenWholeStageColumnarRDD.scala    |  6 +-
 .../gluten/execution/WholeStageTransformer.scala   | 25 ++++++-
 .../execution/WholeStageZippedPartitionsRDD.scala  |  3 +-
 24 files changed, 291 insertions(+), 33 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index edf354b0c3..a31384a8b2 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -281,7 +281,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       updateInputMetrics: InputMetricsWrapper => Unit,
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
-      inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
+      inputIterators: Seq[Iterator[ColumnarBatch]] = Seq(),
+      enableCudf: Boolean = false
   ): Iterator[ColumnarBatch] = {
 
     require(
@@ -321,7 +322,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       pipelineTime: SQLMetric,
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
-      materializeInput: Boolean): Iterator[ColumnarBatch] = {
+      materializeInput: Boolean,
+      enableCudf: Boolean): Iterator[ColumnarBatch] = {
     // scalastyle:on argcount
 
     // Final iterator does not contain scan split, so pass empty split info to 
native here.
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/cudf/VeloxCudfPlanValidatorJniWrapper.java
 
b/backends-velox/src/main/java/org/apache/gluten/cudf/VeloxCudfPlanValidatorJniWrapper.java
new file mode 100644
index 0000000000..8ea0dad2d6
--- /dev/null
+++ 
b/backends-velox/src/main/java/org/apache/gluten/cudf/VeloxCudfPlanValidatorJniWrapper.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.cudf;
+
+/** The jni file is at `cpp/core/jni/VeloxJniWrapper.cc` */
+public class VeloxCudfPlanValidatorJniWrapper {
+  public static native boolean validate(byte[] wsPlan);
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 726c694e7f..c8aa07728f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -228,7 +228,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       updateInputMetrics: InputMetricsWrapper => Unit,
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
-      inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): 
Iterator[ColumnarBatch] = {
+      inputIterators: Seq[Iterator[ColumnarBatch]] = Seq(),
+      enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
     assert(
       inputPartition.isInstanceOf[GlutenPartition],
       "Velox backend only accept GlutenPartition.")
@@ -253,7 +254,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
         splitInfoByteArray,
         columnarNativeIterators,
         partitionIndex,
-        
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
+        
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
+        enableCudf
       )
     val itrMetrics = IteratorMetricsJniWrapper.create()
 
@@ -283,7 +285,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       pipelineTime: SQLMetric,
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
-      materializeInput: Boolean): Iterator[ColumnarBatch] = {
+      materializeInput: Boolean,
+      enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
 
     ExecutorManager.tryTaskSet(numaBindingInfo)
 
@@ -304,7 +307,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
         new Array[Array[Byte]](0),
         columnarNativeIterator,
         partitionIndex,
-        
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
+        
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
+        enableCudf
       )
     val itrMetrics = IteratorMetricsJniWrapper.create()
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index c750db1b9a..87d61f41b0 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -118,6 +118,8 @@ object VeloxRuleApi {
       .getExtendedColumnarPostRules()
       .foreach(each => injector.injectPost(c => each(c.session)))
     injector.injectPost(c => ColumnarCollapseTransformStages(new 
GlutenConfig(c.sqlConf)))
+    injector.injectPost(c => CudfNodeValidationRule(new 
GlutenConfig(c.sqlConf)))
+
     injector.injectPost(c => GlutenNoopWriterRule(c.session))
 
     // Gluten columnar: Final rules.
@@ -206,6 +208,7 @@ object VeloxRuleApi {
       .getExtendedColumnarPostRules()
       .foreach(each => injector.injectPostTransform(c => each(c.session)))
     injector.injectPostTransform(c => ColumnarCollapseTransformStages(new 
GlutenConfig(c.sqlConf)))
+    injector.injectPostTransform(c => CudfNodeValidationRule(new 
GlutenConfig(c.sqlConf)))
     injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
     injector.injectPostTransform(c => 
RemoveGlutenTableCacheColumnarToRow(c.session))
     injector.injectPostTransform(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
new file mode 100644
index 0000000000..aeb21dd049
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.cudf.VeloxCudfPlanValidatorJniWrapper
+import org.apache.gluten.execution.{CudfTag, LeafTransformSupport, 
TransformSupport, WholeStageTransformer}
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+
+// Add the node name prefix 'Cudf' to GlutenPlan when can offload to cudf
+case class CudfNodeValidationRule(glutenConf: GlutenConfig) extends 
Rule[SparkPlan] {
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if (!glutenConf.enableColumnarCudf) {
+      return plan
+    }
+    plan.transformUp {
+      case transformer: WholeStageTransformer =>
+        if (
+          VeloxCudfPlanValidatorJniWrapper.validate(
+            transformer.substraitPlan.toProtobuf.toByteArray)
+        ) {
+          transformer.foreach {
+            case _: LeafTransformSupport =>
+            case t: TransformSupport =>
+              t.setTagValue(CudfTag.CudfTag, true)
+            case _ =>
+          }
+        }
+        transformer
+    }
+  }
+}
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index c9962642ea..00498a9e54 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -94,7 +94,6 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
     throw GlutenException("Not implemented");
   }
 
-  // Just for benchmark
   ::substrait::Plan& getPlan() {
     return substraitPlan_;
   }
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 578d8f66a5..5fb9040e32 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -82,6 +82,14 @@ const std::string kSparkMapKeyDedupPolicy = 
"spark.sql.mapKeyDedupPolicy";
 
 const std::string kSparkLegacyStatisticalAggregate = 
"spark.sql.legacy.statisticalAggregate";
 
+// cudf
+#ifdef GLUTEN_ENABLE_GPU
+const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf";
+const bool kCudfEnabledDefault = "false";
+const std::string kDebugCudf = "spark.gluten.sql.debug.cudf";
+const bool kDebugCudfDefault = "false";
+#endif
+
 std::unordered_map<std::string, std::string>
 parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t 
planDataLength);
 
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 1d2bddfcff..0419436ef1 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -400,11 +400,17 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
     jint partitionId,
     jlong taskId,
     jboolean enableDumping,
-    jstring spillDir) {
+    jstring spillDir,
+    jboolean enableCudf) {
   JNI_METHOD_START
 
   auto ctx = getRuntime(env, wrapper);
-  auto& conf = ctx->getConfMap();
+  auto conf = ctx->getConfMap();
+#ifdef GLUTEN_ENABLE_GPU
+  if (enableCudf) {
+    conf[kCudfEnabled] = "true";
+  }
+#endif
 
   ctx->setSparkTaskInfo({stageId, partitionId, taskId});
 
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 213a33dc64..928d2c875b 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -177,6 +177,9 @@ if(ENABLE_S3)
   find_package(ZLIB)
 endif()
 
+if(ENABLE_GPU)
+  list(APPEND VELOX_SRCS cudf/CudfPlanValidator.cc)
+endif()
 add_library(velox SHARED ${VELOX_SRCS})
 
 if(ENABLE_GLUTEN_VCPKG AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index ad591c91fb..7e032e6c6d 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -444,8 +444,9 @@ auto BM_Generic = [](::benchmark::State& state,
       std::vector<FileReaderIterator*> inputItersRaw;
       if (!dataFiles.empty()) {
         for (const auto& input : dataFiles) {
-          
inputIters.push_back(FileReaderIterator::getInputIteratorFromFileReader(
-              readerType, input, FLAGS_batch_size, 
runtime->memoryManager()->getLeafMemoryPool()));
+          inputIters.push_back(
+              FileReaderIterator::getInputIteratorFromFileReader(
+                  readerType, input, FLAGS_batch_size, 
runtime->memoryManager()->getLeafMemoryPool()));
         }
         std::transform(
             inputIters.begin(),
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index c60a81d4c9..9cd4233bd5 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -21,6 +21,9 @@
 #include "velox/connectors/hive/HiveConfig.h"
 #include "velox/connectors/hive/HiveConnectorSplit.h"
 #include "velox/exec/PlanNodeStats.h"
+#ifdef GLUTEN_ENABLE_GPU
+#include "velox/experimental/cudf/exec/ToCudf.h"
+#endif
 
 using namespace facebook;
 
@@ -75,7 +78,6 @@ WholeStageResultIterator::WholeStageResultIterator(
   if (spillThreadNum > 0) {
     spillExecutor_ = 
std::make_shared<folly::CPUThreadPoolExecutor>(spillThreadNum);
   }
-
   getOrderedNodeIds(veloxPlan_, orderedNodeIds_);
 
   // Create task instance.
@@ -571,6 +573,13 @@ std::unordered_map<std::string, std::string> 
WholeStageResultIterator::getQueryC
     configs[velox::core::QueryConfig::kSparkLegacyStatisticalAggregate] =
         std::to_string(veloxCfg_->get<bool>(kSparkLegacyStatisticalAggregate, 
false));
 
+#ifdef GLUTEN_ENABLE_GPU
+    if (veloxCfg_->get<bool>(kCudfEnabled, false)) {
+      // TODO: wait for PR 
https://github.com/facebookincubator/velox/pull/13341
+      // configs[cudf_velox::kCudfEnabled] = "false";
+    }
+#endif
+
     const auto setIfExists = [&](const std::string& glutenKey, const 
std::string& veloxKey) {
       const auto valueOptional = veloxCfg_->get<std::string>(glutenKey);
       if (valueOptional.hasValue()) {
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index e837314199..38752a8c14 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -146,14 +146,6 @@ const uint32_t kGlogVerboseLevelMaximum = 99;
 const std::string kGlogSeverityLevel = 
"spark.gluten.sql.columnar.backend.velox.glogSeverityLevel";
 const uint32_t kGlogSeverityLevelDefault = 1;
 
-// cudf
-#ifdef GLUTEN_ENABLE_GPU
-const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf";
-const bool kCudfEnabledDefault = "false";
-const std::string kDebugCudf = "spark.gluten.sql.debug.cudf";
-const bool kDebugCudfDefault = "false";
-#endif
-
 // Query trace
 /// Enable query tracing flag.
 const std::string kQueryTraceEnabled = 
"spark.gluten.sql.columnar.backend.velox.queryTraceEnabled";
diff --git a/cpp/velox/cudf/CudfPlanValidator.cc 
b/cpp/velox/cudf/CudfPlanValidator.cc
new file mode 100644
index 0000000000..177613e8cc
--- /dev/null
+++ b/cpp/velox/cudf/CudfPlanValidator.cc
@@ -0,0 +1,81 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CudfPlanValidator.h"
+#include "compute/ResultIterator.h"
+#include "compute/VeloxBackend.h"
+#include "compute/VeloxPlanConverter.h"
+#include "operators/plannodes/RowVectorStream.h"
+#include "velox/core/PlanNode.h"
+#include "velox/exec/Task.h"
+#include "velox/experimental/cudf/exec/ToCudf.h"
+
+using namespace facebook;
+
+namespace gluten {
+bool CudfPlanValidator::validate(const ::substrait::Plan& substraitPlan) {
+  auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool();
+  std::vector<::substrait::ReadRel_LocalFiles> localFiles;
+  std::unordered_map<std::string, std::string> configValues;
+  std::vector<std::shared_ptr<ResultIterator>> inputs;
+  std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg =
+      
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>());
+  VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(), 
veloxCfg.get(), std::nullopt, true);
+  auto planNode = veloxPlanConverter.toVeloxPlan(substraitPlan, localFiles);
+  std::unordered_set<velox::core::PlanNodeId> emptySet;
+  velox::core::PlanFragment planFragment{planNode, 
velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
+
+  std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>> 
connectorConfigs;
+  static std::atomic<uint32_t> vtId{0};
+  std::shared_ptr<velox::core::QueryCtx> queryCtx = 
velox::core::QueryCtx::create(
+      nullptr,
+      facebook::velox::core::QueryConfig{configValues},
+      connectorConfigs,
+      gluten::VeloxBackend::get()->getAsyncDataCache(),
+      getDefaultMemoryManager()->getAggregateMemoryPool(),
+      nullptr,
+      fmt::format("Gluten_Cudf_Validation_VTID_{}", std::to_string(vtId++)));
+  std::shared_ptr<facebook::velox::exec::Task> task = 
velox::exec::Task::create(
+      fmt::format("Gluten_Cudf_Validation_VTID_{}", std::to_string(vtId++)),
+      std::move(planFragment),
+      0,
+      std::move(queryCtx),
+      velox::exec::Task::ExecutionMode::kSerial);
+  std::vector<velox::exec::Operator*> operators;
+  task->testingVisitDrivers([&](velox::exec::Driver* driver) { operators = 
driver->operators(); });
+  for (const auto* op : operators) {
+    if (dynamic_cast<const exec::TableScan*>(op) != nullptr) {
+      continue;
+    }
+    // TODO: wait for PR https://github.com/facebookincubator/velox/pull/13341
+    // if (cudf_velox::isCudfOperator(op)) {
+    //   continue;
+    // }
+    if (dynamic_cast<const ValueStream*>(op) != nullptr) {
+      continue;
+    }
+    LOG(INFO) << "Operator " << op->operatorType() << " is not supported in 
cudf";
+    task->requestCancel().wait();
+    return false;
+  }
+  task->requestCancel().wait();
+  LOG(INFO) << "Cudf Operator validation success";
+  return true;
+}
+
+} // namespace gluten
diff --git a/cpp/velox/cudf/CudfPlanValidator.h 
b/cpp/velox/cudf/CudfPlanValidator.h
new file mode 100644
index 0000000000..de4fb150e0
--- /dev/null
+++ b/cpp/velox/cudf/CudfPlanValidator.h
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "memory/VeloxMemoryManager.h"
+#include "substrait/plan.pb.h"
+
+namespace gluten {
+class CudfPlanValidator {
+ public:
+  // Validate if the plan contains cudf unsupported operator except TableScan 
and ValueStream.
+  static bool validate(const ::substrait::Plan& substraitPlan);
+};
+} // namespace gluten
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index fdadf13ba8..c372f78d15 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -37,6 +37,10 @@
 #include "velox/common/base/BloomFilter.h"
 #include "velox/common/file/FileSystems.h"
 
+#ifdef GLUTEN_ENABLE_GPU
+#include "cudf/CudfPlanValidator.h"
+#endif
+
 using namespace gluten;
 using namespace facebook;
 
@@ -590,6 +594,22 @@ JNIEXPORT jboolean JNICALL 
Java_org_apache_gluten_config_ConfigJniWrapper_isEnha
 #endif
 }
 
+#ifdef GLUTEN_ENABLE_GPU
+JNIEXPORT jboolean JNICALL 
Java_org_apache_gluten_cudf_VeloxCudfPlanValidatorJniWrapper_validate( // NOLINT
+    JNIEnv* env,
+    jclass,
+    jbyteArray planArr) {
+  JNI_METHOD_START
+  auto safePlanArray = getByteArrayElementsSafe(env, planArr);
+  auto planSize = env->GetArrayLength(planArr);
+  ::substrait::Plan substraitPlan;
+  parseProtobuf(safePlanArray.elems(), planSize, &substraitPlan);
+  // get the task and driver, validate the plan, if return all operator except 
table scan is offloaded, validate true.
+  return CudfPlanValidator::validate(substraitPlan);
+  JNI_METHOD_END(false)
+}
+#endif
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h 
b/cpp/velox/operators/plannodes/RowVectorStream.h
index d503eb2d14..0c56112450 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -61,6 +61,8 @@ class RowVectorStream {
     if (finished_) {
       return false;
     }
+    VELOX_DCHECK_NOT_NULL(iterator_);
+
     bool hasNext;
     {
       // We are leaving Velox task execution and are probably entering Spark 
code through JNI. Suspend the current
@@ -157,7 +159,6 @@ class ValueStream : public 
facebook::velox::exec::SourceOperator {
             valueStreamNode->id(),
             valueStreamNode->name().data()) {
     ResultIterator* itr = valueStreamNode->iterator();
-    VELOX_CHECK_NOT_NULL(itr);
     rvStream_ = std::make_unique<RowVectorStream>(driverCtx, pool(), itr, 
outputType_);
   }
 
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index ec9aee855b..2241ddcdaa 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -25,7 +25,7 @@ namespace gluten {
 
 class DummyMemoryManager final : public MemoryManager {
  public:
-  DummyMemoryManager(const std::string& kind) : MemoryManager(kind){};
+  DummyMemoryManager(const std::string& kind) : MemoryManager(kind) {};
 
   arrow::MemoryPool* defaultArrowMemoryPool() override {
     throw GlutenException("Not yet implemented");
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index 7fefb00c22..96643dd711 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -62,7 +62,8 @@ public class NativePlanEvaluator {
       byte[][] splitInfo,
       List<ColumnarBatchInIterator> iterList,
       int partitionIndex,
-      String spillDirPath)
+      String spillDirPath,
+      boolean enableCudf)
       throws RuntimeException {
     final long itrHandle =
         jniWrapper.nativeCreateKernelWithIterator(
@@ -73,7 +74,8 @@ public class NativePlanEvaluator {
             partitionIndex, // TaskContext.getPartitionId(),
             TaskContext.get().taskAttemptId(),
             DebugUtil.isDumpingEnabledForTask(),
-            spillDirPath);
+            spillDirPath,
+            enableCudf);
     final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle);
     runtime
         .memoryManager()
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
index ebd23d6f64..39a543422d 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
@@ -66,6 +66,7 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware {
       int partitionId,
       long taskId,
       boolean enableDumping,
-      String spillDir)
+      String spillDir,
+      boolean enableCudf)
       throws RuntimeException;
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index efb793839f..637cc43ae7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -75,7 +75,8 @@ trait IteratorApi {
       updateInputMetrics: InputMetricsWrapper => Unit,
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
-      inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
+      inputIterators: Seq[Iterator[ColumnarBatch]] = Seq(),
+      enableCudf: Boolean = false
   ): Iterator[ColumnarBatch]
 
   /**
@@ -92,6 +93,7 @@ trait IteratorApi {
       pipelineTime: SQLMetric,
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
-      materializeInput: Boolean = false): Iterator[ColumnarBatch]
+      materializeInput: Boolean = false,
+      enableCudf: Boolean = false): Iterator[ColumnarBatch]
   // scalastyle:on argcount
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index f562d1c127..839708d810 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -106,6 +106,8 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
   def enableCountDistinctWithoutExpand: Boolean =
     getConf(ENABLE_COUNT_DISTINCT_WITHOUT_EXPAND)
 
+  def enableColumnarCudf: Boolean = getConf(COLUMNAR_CUDF_ENABLED)
+
   def enableExtendedColumnPruning: Boolean =
     getConf(ENABLE_EXTENDED_COLUMN_PRUNING)
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index 011c34865b..d8de26c15d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -56,7 +56,8 @@ class GlutenWholeStageColumnarRDD(
     var rdds: ColumnarInputRDDsWrapper,
     pipelineTime: SQLMetric,
     updateInputMetrics: InputMetricsWrapper => Unit,
-    updateNativeMetrics: IMetrics => Unit)
+    updateNativeMetrics: IMetrics => Unit,
+    enableCudf: Boolean = false)
   extends RDD[ColumnarBatch](sc, rdds.getDependencies) {
   private val numaBindingInfo = GlutenConfig.get.numaBindingInfo
 
@@ -73,7 +74,8 @@ class GlutenWholeStageColumnarRDD(
           updateInputMetrics,
           updateNativeMetrics,
           split.index,
-          inputIterators
+          inputIterators,
+          enableCudf
         )
     }
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 15655f79c8..3bdd6eba44 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -34,6 +34,7 @@ import org.apache.spark.softaffinity.SoftAffinity
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.SQLMetric
@@ -49,10 +50,14 @@ import scala.collection.mutable.ArrayBuffer
 
 case class TransformContext(outputAttributes: Seq[Attribute], root: RelNode)
 
-case class WholeStageTransformContext(root: PlanNode, substraitContext: 
SubstraitContext = null)
+case class WholeStageTransformContext(
+    root: PlanNode,
+    substraitContext: SubstraitContext = null,
+    enableCudf: Boolean = false)
 
 /** Base interface for a query plan that can be interpreted to Substrait 
representation. */
 trait TransformSupport extends ValidatablePlan {
+
   override def batchType(): Convention.BatchType = {
     BackendsApiManager.getSettings.primaryBatchType
   }
@@ -66,6 +71,15 @@ trait TransformSupport extends ValidatablePlan {
       s"${this.getClass.getSimpleName} doesn't support doExecute")
   }
 
+  protected def isCudf: Boolean = 
getTagValue[Boolean](CudfTag.CudfTag).getOrElse(false)
+
+  // Use super.nodeName will cause exception scala 213 Super calls can only 
target methods
+  // for FileSourceScan.
+  override def nodeName: String =
+    if (isCudf) {
+      "Cudf" + getClass.getSimpleName.replaceAll("Exec$", "")
+    } else getClass.getSimpleName
+
   /**
    * Returns all the RDDs of ColumnarBatch which generates the input rows.
    *
@@ -245,7 +259,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
       PlanBuilder.makePlan(substraitContext, 
Lists.newArrayList(childCtx.root), outNames)
     }
 
-    WholeStageTransformContext(planNode, substraitContext)
+    WholeStageTransformContext(planNode, substraitContext, isCudf)
   }
 
   def doWholeStageTransform(): WholeStageTransformContext = {
@@ -346,7 +360,8 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
         wsCtx.substraitContext.registeredRelMap,
         wsCtx.substraitContext.registeredJoinParams,
         wsCtx.substraitContext.registeredAggregationParams
-      )
+      ),
+      wsCtx.enableCudf
     )
 
     SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
@@ -534,3 +549,7 @@ class ColumnarInputRDDsWrapper(columnarInputRDDs: 
Seq[RDD[ColumnarBatch]]) exten
     }
   }
 }
+
+object CudfTag {
+  val CudfTag = TreeNodeTag[Boolean]("org.apache.gluten.CudfTag")
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
index 1bb19cfd8b..2c91057509 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
@@ -56,7 +56,8 @@ class WholeStageZippedPartitionsRDD(
             pipelineTime,
             updateNativeMetrics,
             split.index,
-            materializeInput
+            materializeInput,
+            resCtx.enableCudf
           )
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to