This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 001a126f78 [GLUTEN-10578] Remove `NUMA Binding` feature (#10579)
001a126f78 is described below

commit 001a126f78057e9451ffeb25e83c83d5011dadf3
Author: Jiaan Geng <[email protected]>
AuthorDate: Sat Aug 30 03:48:06 2025 +0800

    [GLUTEN-10578] Remove `NUMA Binding` feature (#10579)
    
    This PR removed the experimental NUMA binding feature.
    
    Fixes #10578
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  2 -
 .../backendsapi/velox/VeloxIteratorApi.scala       |  6 +--
 docs/Configuration.md                              |  6 +--
 .../apache/gluten/backendsapi/IteratorApi.scala    |  2 -
 .../org/apache/gluten/config/GlutenConfig.scala    | 34 ------------
 .../execution/GlutenWholeStageColumnarRDD.scala    |  4 --
 .../gluten/execution/WholeStageTransformer.scala   |  1 -
 .../execution/WholeStageZippedPartitionsRDD.scala  |  3 --
 .../org/apache/spark/util/ExecutorManager.scala    | 62 ----------------------
 9 files changed, 3 insertions(+), 117 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index a31384a8b2..86560b3d36 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.backendsapi.clickhouse
 
 import org.apache.gluten.backendsapi.IteratorApi
-import org.apache.gluten.config.GlutenNumaBindingInfo
 import org.apache.gluten.execution._
 import org.apache.gluten.expression.ConverterUtils
 import org.apache.gluten.logging.LogLevelUtil
@@ -316,7 +315,6 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
   override def genFinalStageIterator(
       context: TaskContext,
       inputIterators: Seq[Iterator[ColumnarBatch]],
-      numaBindingInfo: GlutenNumaBindingInfo,
       sparkConf: SparkConf,
       rootNode: PlanNode,
       pipelineTime: SQLMetric,
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index fa713549fd..ee1889d340 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.backendsapi.velox
 
 import org.apache.gluten.backendsapi.{BackendsApiManager, IteratorApi}
 import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName
-import org.apache.gluten.config.GlutenNumaBindingInfo
 import org.apache.gluten.execution._
 import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.metrics.{IMetrics, IteratorMetricsJniWrapper}
@@ -39,7 +38,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil}
+import org.apache.spark.util.SparkDirectoryUtil
 
 import java.lang.{Long => JLong}
 import java.nio.charset.StandardCharsets
@@ -279,7 +278,6 @@ class VeloxIteratorApi extends IteratorApi with Logging {
   override def genFinalStageIterator(
       context: TaskContext,
       inputIterators: Seq[Iterator[ColumnarBatch]],
-      numaBindingInfo: GlutenNumaBindingInfo,
       sparkConf: SparkConf,
       rootNode: PlanNode,
       pipelineTime: SQLMetric,
@@ -288,8 +286,6 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       materializeInput: Boolean,
       enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
 
-    ExecutorManager.tryTaskSet(numaBindingInfo)
-
     val transKernel = 
NativePlanEvaluator.create(BackendsApiManager.getBackendName)
     val columnarNativeIterator =
       new JArrayList[ColumnarBatchInIterator](inputIterators.map {
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 7b9bcbe49a..c0853ccb01 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -55,6 +55,7 @@ nav_order: 15
 | spark.gluten.soft-affinity.min.target-hosts                        | 1       
          | For on HDFS, if there are already target hosts, and then prefer to 
use the original target hosts to schedule                                       
                                                                                
                                                                                
                                                                                
               [...]
 | spark.gluten.soft-affinity.replications.num                        | 2       
          | Calculate the number of the replications for scheduling to the 
target executors per file                                                       
                                                                                
                                                                                
                                                                                
                   [...]
 | spark.gluten.sql.adaptive.costEvaluator.enabled                    | true    
          | If true, use 
org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator as custom cost 
evaluator class, else follow the configuration 
spark.sql.adaptive.customCostEvaluatorClass.                                    
                                                                                
                                                                                
                           [...]
+| spark.gluten.sql.ansiFallback.enabled                              | true    
          | When true (default), Gluten will fall back to Spark when ANSI mode 
is enabled. When false, Gluten will attempt to execute in ANSI mode.            
                                                                                
                                                                                
                                                                                
               [...]
 | spark.gluten.sql.benchmark_task.partitionId                                  
         ||
 | spark.gluten.sql.benchmark_task.stageId                            | -1      
          |
 | spark.gluten.sql.benchmark_task.taskId                                       
         ||
@@ -71,7 +72,6 @@ nav_order: 15
 | spark.gluten.sql.columnar.coalesce                                 | true    
          | Enable or disable columnar coalesce.                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.collectLimit                             | true    
          | Enable or disable columnar collectLimit.                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.collectTail                              | true    
          | Enable or disable columnar collectTail.                             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.coreRange                                | 
&lt;undefined&gt; |
 | spark.gluten.sql.columnar.cudf                                     | false   
          | Enable or disable cudf support. This is an experimental feature.    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.enableNestedColumnPruningInHiveTableScan | true    
          | Enable or disable nested column pruning in hivetablescan.           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.enableVanillaVectorizedReaders           | true    
          | Enable or disable vanilla vectorized scan.                          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
@@ -95,7 +95,6 @@ nav_order: 15
 | spark.gluten.sql.columnar.libpath                                            
         || The gluten library path.                                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.limit                                    | true    
          |
 | spark.gluten.sql.columnar.maxBatchSize                             | 4096    
          |
-| spark.gluten.sql.columnar.numaBinding                              | false   
          |
 | spark.gluten.sql.columnar.overwriteByExpression                    | true    
          | Enable or disable columnar v2 command overwrite by expression.      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.parquet.write.blockSize                  | 128MB   
          |
 | spark.gluten.sql.columnar.partial.project                          | true    
          | Break up one project node into 2 phases when some of the 
expressions are non offload-able. Phase one is a regular offloaded project 
transformer that evaluates the offload-able expressions in native, phase two 
preserves the output from phase one and evaluates the remaining 
non-offload-able expressions using vanilla Spark projections                    
                                                 [...]
@@ -111,7 +110,7 @@ nav_order: 15
 | spark.gluten.sql.columnar.shuffle                                  | true    
          | Enable or disable columnar shuffle.                                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled        | true    
          | If enabled, fall back to ColumnarShuffleManager when celeborn 
service is unavailable.Otherwise, throw an exception.                           
                                                                                
                                                                                
                                                                                
                    [...]
 | spark.gluten.sql.columnar.shuffle.celeborn.useRssSort              | true    
          | If true, use RSS sort implementation for Celeborn sort-based 
shuffle.If false, use Gluten's row-based sort implementation. Only valid when 
`spark.celeborn.client.spark.shuffle.writer` is set to `sort`.                  
                                                                                
                                                                                
                       [...]
-| spark.gluten.sql.columnar.shuffle.codec                            | 
&lt;undefined&gt; | By default, the supported codecs are lz4 and zstd. When 
spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are 
gzip and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=iaa,the 
supported codec is gzip.                                                        
                                                                                
                                   [...]
+| spark.gluten.sql.columnar.shuffle.codec                            | 
&lt;undefined&gt; | By default, the supported codecs are lz4 and zstd. When 
spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are 
gzip and zstd.                                                                  
                                                                                
                                                                                
                              [...]
 | spark.gluten.sql.columnar.shuffle.codecBackend                     | 
&lt;undefined&gt; |
 | spark.gluten.sql.columnar.shuffle.compression.threshold            | 100     
          | If number of rows in a batch falls below this threshold, will copy 
all buffers into one buffer to compress.                                        
                                                                                
                                                                                
                                                                                
               [...]
 | spark.gluten.sql.columnar.shuffle.compressionMode                  | buffer  
          | buffer means compress each buffer to pre allocated big 
buffer,rowvector means to copy the buffers to a big buffer, and then compress 
the buffer                                                                      
                                                                                
                                                                                
                             [...]
@@ -141,7 +140,6 @@ nav_order: 15
 | spark.gluten.sql.debug.cudf                                        | false   
          |
 | spark.gluten.sql.debug.keepJniWorkspace                            | false   
          |
 | spark.gluten.sql.debug.keepJniWorkspaceDir                         | /tmp    
          |
-| spark.gluten.sql.ansiFallback.enabled                              | true    
          | When true (default), Gluten will fallback to Spark when ANSI mode 
is enabled. When false, Gluten will attempt to execute in ANSI mode.            
                                                                                
                                                                                
                                                                                
                [...]
 | spark.gluten.sql.enable.native.validation                          | true    
          | This is tmp config to specify whether to enable the native 
validation based on Substrait plan. After the validations in all backends are 
correctly implemented, this config should be removed.                           
                                                                                
                                                                                
                         [...]
 | spark.gluten.sql.extendedColumnPruning.enabled                     | true    
          | Do extended nested column pruning for cases ignored by vanilla 
Spark.                                                                          
                                                                                
                                                                                
                                                                                
                   [...]
 | spark.gluten.sql.fallbackEncryptedParquet                          | false   
          | If enabled, gluten will not offload scan when encrypted parquet 
files are detected                                                              
                                                                                
                                                                                
                                                                                
                  [...]
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 637cc43ae7..0a19d2207d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.backendsapi
 
-import org.apache.gluten.config.GlutenNumaBindingInfo
 import org.apache.gluten.execution.{BaseGlutenPartition, LeafTransformSupport, 
WholeStageTransformContext}
 import org.apache.gluten.metrics.IMetrics
 import org.apache.gluten.substrait.plan.PlanNode
@@ -87,7 +86,6 @@ trait IteratorApi {
   def genFinalStageIterator(
       context: TaskContext,
       inputIterators: Seq[Iterator[ColumnarBatch]],
-      numaBindingInfo: GlutenNumaBindingInfo,
       sparkConf: SparkConf,
       rootNode: PlanNode,
       pipelineTime: SQLMetric,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index ef2bd560f8..7d85b01115 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -29,11 +29,6 @@ import java.util.Locale
 
 import scala.collection.JavaConverters._
 
-case class GlutenNumaBindingInfo(
-    enableNumaBinding: Boolean,
-    totalCoreRange: Array[String] = null,
-    numCoresPerExecutor: Int = -1) {}
-
 trait ShuffleWriterType {
   val name: String
 }
@@ -260,23 +255,6 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
 
   def fallbackPreferColumnar: Boolean = 
getConf(COLUMNAR_FALLBACK_PREFER_COLUMNAR)
 
-  def numaBindingInfo: GlutenNumaBindingInfo = {
-    val enableNumaBinding: Boolean = getConf(COLUMNAR_NUMA_BINDING_ENABLED)
-    if (!enableNumaBinding) {
-      GlutenNumaBindingInfo(enableNumaBinding = false)
-    } else {
-      val tmp = getConf(COLUMNAR_NUMA_BINDING_CORE_RANGE)
-      if (tmp.isEmpty) {
-        GlutenNumaBindingInfo(enableNumaBinding = false)
-      } else {
-        val numCores = conf.getConfString("spark.executor.cores", "1").toInt
-        val coreRangeList: Array[String] = tmp.get.split('|').map(_.trim)
-        GlutenNumaBindingInfo(enableNumaBinding = true, coreRangeList, 
numCores)
-      }
-
-    }
-  }
-
   def cartesianProductTransformerEnabled: Boolean =
     getConf(CARTESIAN_PRODUCT_TRANSFORMER_ENABLED)
 
@@ -1205,18 +1183,6 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(true)
 
-  val COLUMNAR_NUMA_BINDING_ENABLED =
-    buildConf("spark.gluten.sql.columnar.numaBinding")
-      .internal()
-      .booleanConf
-      .createWithDefault(false)
-
-  val COLUMNAR_NUMA_BINDING_CORE_RANGE =
-    buildConf("spark.gluten.sql.columnar.coreRange")
-      .internal()
-      .stringConf
-      .createOptional
-
   val COLUMNAR_MEMORY_BACKTRACE_ALLOCATION =
     buildConf("spark.gluten.memory.backtrace.allocation")
       .internal()
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index d8de26c15d..d332e3774a 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics}
 
 import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
@@ -26,7 +25,6 @@ import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.ExecutorManager
 
 trait BaseGlutenPartition extends Partition with InputPartition {
   def plan: Array[Byte]
@@ -59,12 +57,10 @@ class GlutenWholeStageColumnarRDD(
     updateNativeMetrics: IMetrics => Unit,
     enableCudf: Boolean = false)
   extends RDD[ColumnarBatch](sc, rdds.getDependencies) {
-  private val numaBindingInfo = GlutenConfig.get.numaBindingInfo
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[ColumnarBatch] = {
     GlutenTimeMetric.millis(pipelineTime) {
       _ =>
-        ExecutorManager.tryTaskSet(numaBindingInfo)
         val (inputPartition, inputColumnarRDDPartitions) = 
castNativePartition(split)
         val inputIterators = rdds.getIterators(inputColumnarRDDPartitions, 
context)
         BackendsApiManager.getIteratorApiInstance.genFirstStageIterator(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index bf0392dabc..0c5e1b58b1 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -447,7 +447,6 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
       new WholeStageZippedPartitionsRDD(
         sparkContext,
         inputRDDs,
-        GlutenConfig.get.numaBindingInfo,
         sparkContext.getConf,
         wsCtx,
         pipelineTime,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
index 2c91057509..521388faae 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.config.GlutenNumaBindingInfo
 import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics}
 
 import org.apache.spark.{Partition, SparkConf, SparkContext, TaskContext}
@@ -33,7 +32,6 @@ private[gluten] class ZippedPartitionsPartition(
 class WholeStageZippedPartitionsRDD(
     @transient private val sc: SparkContext,
     var rdds: ColumnarInputRDDsWrapper,
-    numaBindingInfo: GlutenNumaBindingInfo,
     sparkConf: SparkConf,
     resCtx: WholeStageTransformContext,
     pipelineTime: SQLMetric,
@@ -50,7 +48,6 @@ class WholeStageZippedPartitionsRDD(
           .genFinalStageIterator(
             context,
             inputIterators,
-            numaBindingInfo,
             sparkConf,
             resCtx.root,
             pipelineTime,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/util/ExecutorManager.scala 
b/gluten-substrait/src/main/scala/org/apache/spark/util/ExecutorManager.scala
deleted file mode 100644
index 4e55aff0d3..0000000000
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/util/ExecutorManager.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.util
-
-import org.apache.gluten.config.GlutenNumaBindingInfo
-
-import org.apache.spark.{SparkContext, SparkEnv}
-
-import java.lang.management.ManagementFactory
-
-object ExecutorManager {
-  var isTaskSet: Boolean = false
-
-  def getExecutorIds(sc: SparkContext): Seq[String] = sc.getExecutorIds
-
-  def tryTaskSet(numaInfo: GlutenNumaBindingInfo): Any = synchronized {
-    if (numaInfo.enableNumaBinding && !isTaskSet) {
-      val cmd_output =
-        Utils.executeAndGetOutput(
-          Seq("bash", "-c", "ps -ef | grep YarnCoarseGrainedExecutorBackend"))
-      val getExecutorId = """--executor-id (\d+)""".r
-      val executorIdOnLocalNode = {
-        val tmp = for (m <- getExecutorId.findAllMatchIn(cmd_output)) yield 
m.group(1)
-        tmp.toList.distinct
-      }
-      val executorId = SparkEnv.get.executorId
-      val numCorePerExecutor = numaInfo.numCoresPerExecutor
-      val coreRange = numaInfo.totalCoreRange
-      val shouldBindNumaIdx = executorIdOnLocalNode.indexOf(executorId) % 
coreRange.size
-      // val coreStartIdx = coreRange(shouldBindNumaIdx)._1
-      // val coreEndIdx = coreRange(shouldBindNumaIdx)._2
-      // scalastyle:off println
-      System.out.println(
-        s"executorId is $executorId, executorIdOnLocalNode is 
$executorIdOnLocalNode")
-      val taskSetCmd = s"taskset -cpa ${coreRange(shouldBindNumaIdx)} 
${getProcessId()}"
-      System.out.println(taskSetCmd)
-      // scalastyle:on println
-      isTaskSet = true
-      Utils.executeCommand(Seq("bash", "-c", taskSetCmd))
-    }
-  }
-
-  def getProcessId(): Int = {
-    val runtimeMXBean = ManagementFactory.getRuntimeMXBean()
-    runtimeMXBean.getName().split("@")(0).toInt
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to