This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 001a126f78 [GLUTEN-10578] Remove `NUMA Binding` feature (#10579)
001a126f78 is described below
commit 001a126f78057e9451ffeb25e83c83d5011dadf3
Author: Jiaan Geng <[email protected]>
AuthorDate: Sat Aug 30 03:48:06 2025 +0800
[GLUTEN-10578] Remove `NUMA Binding` feature (#10579)
This PR removed the experimental NUMA binding feature.
Fixes #10578
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 2 -
.../backendsapi/velox/VeloxIteratorApi.scala | 6 +--
docs/Configuration.md | 6 +--
.../apache/gluten/backendsapi/IteratorApi.scala | 2 -
.../org/apache/gluten/config/GlutenConfig.scala | 34 ------------
.../execution/GlutenWholeStageColumnarRDD.scala | 4 --
.../gluten/execution/WholeStageTransformer.scala | 1 -
.../execution/WholeStageZippedPartitionsRDD.scala | 3 --
.../org/apache/spark/util/ExecutorManager.scala | 62 ----------------------
9 files changed, 3 insertions(+), 117 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index a31384a8b2..86560b3d36 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.backendsapi.IteratorApi
-import org.apache.gluten.config.GlutenNumaBindingInfo
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.logging.LogLevelUtil
@@ -316,7 +315,6 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
override def genFinalStageIterator(
context: TaskContext,
inputIterators: Seq[Iterator[ColumnarBatch]],
- numaBindingInfo: GlutenNumaBindingInfo,
sparkConf: SparkConf,
rootNode: PlanNode,
pipelineTime: SQLMetric,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index fa713549fd..ee1889d340 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.backendsapi.{BackendsApiManager, IteratorApi}
import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName
-import org.apache.gluten.config.GlutenNumaBindingInfo
import org.apache.gluten.execution._
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.metrics.{IMetrics, IteratorMetricsJniWrapper}
@@ -39,7 +38,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil}
+import org.apache.spark.util.SparkDirectoryUtil
import java.lang.{Long => JLong}
import java.nio.charset.StandardCharsets
@@ -279,7 +278,6 @@ class VeloxIteratorApi extends IteratorApi with Logging {
override def genFinalStageIterator(
context: TaskContext,
inputIterators: Seq[Iterator[ColumnarBatch]],
- numaBindingInfo: GlutenNumaBindingInfo,
sparkConf: SparkConf,
rootNode: PlanNode,
pipelineTime: SQLMetric,
@@ -288,8 +286,6 @@ class VeloxIteratorApi extends IteratorApi with Logging {
materializeInput: Boolean,
enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
- ExecutorManager.tryTaskSet(numaBindingInfo)
-
val transKernel =
NativePlanEvaluator.create(BackendsApiManager.getBackendName)
val columnarNativeIterator =
new JArrayList[ColumnarBatchInIterator](inputIterators.map {
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 7b9bcbe49a..c0853ccb01 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -55,6 +55,7 @@ nav_order: 15
| spark.gluten.soft-affinity.min.target-hosts | 1
| For on HDFS, if there are already target hosts, and then prefer to
use the original target hosts to schedule
[...]
| spark.gluten.soft-affinity.replications.num | 2
| Calculate the number of the replications for scheduling to the
target executors per file
[...]
| spark.gluten.sql.adaptive.costEvaluator.enabled | true
| If true, use
org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator as custom cost
evaluator class, else follow the configuration
spark.sql.adaptive.customCostEvaluatorClass.
[...]
+| spark.gluten.sql.ansiFallback.enabled | true
| When true (default), Gluten will fall back to Spark when ANSI mode
is enabled. When false, Gluten will attempt to execute in ANSI mode.
[...]
| spark.gluten.sql.benchmark_task.partitionId
||
| spark.gluten.sql.benchmark_task.stageId | -1
|
| spark.gluten.sql.benchmark_task.taskId
||
@@ -71,7 +72,6 @@ nav_order: 15
| spark.gluten.sql.columnar.coalesce | true
| Enable or disable columnar coalesce.
[...]
| spark.gluten.sql.columnar.collectLimit | true
| Enable or disable columnar collectLimit.
[...]
| spark.gluten.sql.columnar.collectTail | true
| Enable or disable columnar collectTail.
[...]
-| spark.gluten.sql.columnar.coreRange |
<undefined> |
| spark.gluten.sql.columnar.cudf | false
| Enable or disable cudf support. This is an experimental feature.
[...]
| spark.gluten.sql.columnar.enableNestedColumnPruningInHiveTableScan | true
| Enable or disable nested column pruning in hivetablescan.
[...]
| spark.gluten.sql.columnar.enableVanillaVectorizedReaders | true
| Enable or disable vanilla vectorized scan.
[...]
@@ -95,7 +95,6 @@ nav_order: 15
| spark.gluten.sql.columnar.libpath
|| The gluten library path.
[...]
| spark.gluten.sql.columnar.limit | true
|
| spark.gluten.sql.columnar.maxBatchSize | 4096
|
-| spark.gluten.sql.columnar.numaBinding | false
|
| spark.gluten.sql.columnar.overwriteByExpression | true
| Enable or disable columnar v2 command overwrite by expression.
[...]
| spark.gluten.sql.columnar.parquet.write.blockSize | 128MB
|
| spark.gluten.sql.columnar.partial.project | true
| Break up one project node into 2 phases when some of the
expressions are non offload-able. Phase one is a regular offloaded project
transformer that evaluates the offload-able expressions in native, phase two
preserves the output from phase one and evaluates the remaining
non-offload-able expressions using vanilla Spark projections
[...]
@@ -111,7 +110,7 @@ nav_order: 15
| spark.gluten.sql.columnar.shuffle | true
| Enable or disable columnar shuffle.
[...]
| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true
| If enabled, fall back to ColumnarShuffleManager when celeborn
service is unavailable.Otherwise, throw an exception.
[...]
| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true
| If true, use RSS sort implementation for Celeborn sort-based
shuffle.If false, use Gluten's row-based sort implementation. Only valid when
`spark.celeborn.client.spark.shuffle.writer` is set to `sort`.
[...]
-| spark.gluten.sql.columnar.shuffle.codec |
<undefined> | By default, the supported codecs are lz4 and zstd. When
spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are
gzip and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=iaa,the
supported codec is gzip.
[...]
+| spark.gluten.sql.columnar.shuffle.codec |
<undefined> | By default, the supported codecs are lz4 and zstd. When
spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are
gzip and zstd.
[...]
| spark.gluten.sql.columnar.shuffle.codecBackend |
<undefined> |
| spark.gluten.sql.columnar.shuffle.compression.threshold | 100
| If number of rows in a batch falls below this threshold, will copy
all buffers into one buffer to compress.
[...]
| spark.gluten.sql.columnar.shuffle.compressionMode | buffer
| buffer means compress each buffer to pre allocated big
buffer,rowvector means to copy the buffers to a big buffer, and then compress
the buffer
[...]
@@ -141,7 +140,6 @@ nav_order: 15
| spark.gluten.sql.debug.cudf | false
|
| spark.gluten.sql.debug.keepJniWorkspace | false
|
| spark.gluten.sql.debug.keepJniWorkspaceDir | /tmp
|
-| spark.gluten.sql.ansiFallback.enabled | true
| When true (default), Gluten will fallback to Spark when ANSI mode
is enabled. When false, Gluten will attempt to execute in ANSI mode.
[...]
| spark.gluten.sql.enable.native.validation | true
| This is tmp config to specify whether to enable the native
validation based on Substrait plan. After the validations in all backends are
correctly implemented, this config should be removed.
[...]
| spark.gluten.sql.extendedColumnPruning.enabled | true
| Do extended nested column pruning for cases ignored by vanilla
Spark.
[...]
| spark.gluten.sql.fallbackEncryptedParquet | false
| If enabled, gluten will not offload scan when encrypted parquet
files are detected
[...]
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 637cc43ae7..0a19d2207d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.backendsapi
-import org.apache.gluten.config.GlutenNumaBindingInfo
import org.apache.gluten.execution.{BaseGlutenPartition, LeafTransformSupport,
WholeStageTransformContext}
import org.apache.gluten.metrics.IMetrics
import org.apache.gluten.substrait.plan.PlanNode
@@ -87,7 +86,6 @@ trait IteratorApi {
def genFinalStageIterator(
context: TaskContext,
inputIterators: Seq[Iterator[ColumnarBatch]],
- numaBindingInfo: GlutenNumaBindingInfo,
sparkConf: SparkConf,
rootNode: PlanNode,
pipelineTime: SQLMetric,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index ef2bd560f8..7d85b01115 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -29,11 +29,6 @@ import java.util.Locale
import scala.collection.JavaConverters._
-case class GlutenNumaBindingInfo(
- enableNumaBinding: Boolean,
- totalCoreRange: Array[String] = null,
- numCoresPerExecutor: Int = -1) {}
-
trait ShuffleWriterType {
val name: String
}
@@ -260,23 +255,6 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
def fallbackPreferColumnar: Boolean =
getConf(COLUMNAR_FALLBACK_PREFER_COLUMNAR)
- def numaBindingInfo: GlutenNumaBindingInfo = {
- val enableNumaBinding: Boolean = getConf(COLUMNAR_NUMA_BINDING_ENABLED)
- if (!enableNumaBinding) {
- GlutenNumaBindingInfo(enableNumaBinding = false)
- } else {
- val tmp = getConf(COLUMNAR_NUMA_BINDING_CORE_RANGE)
- if (tmp.isEmpty) {
- GlutenNumaBindingInfo(enableNumaBinding = false)
- } else {
- val numCores = conf.getConfString("spark.executor.cores", "1").toInt
- val coreRangeList: Array[String] = tmp.get.split('|').map(_.trim)
- GlutenNumaBindingInfo(enableNumaBinding = true, coreRangeList,
numCores)
- }
-
- }
- }
-
def cartesianProductTransformerEnabled: Boolean =
getConf(CARTESIAN_PRODUCT_TRANSFORMER_ENABLED)
@@ -1205,18 +1183,6 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
- val COLUMNAR_NUMA_BINDING_ENABLED =
- buildConf("spark.gluten.sql.columnar.numaBinding")
- .internal()
- .booleanConf
- .createWithDefault(false)
-
- val COLUMNAR_NUMA_BINDING_CORE_RANGE =
- buildConf("spark.gluten.sql.columnar.coreRange")
- .internal()
- .stringConf
- .createOptional
-
val COLUMNAR_MEMORY_BACKTRACE_ALLOCATION =
buildConf("spark.gluten.memory.backtrace.allocation")
.internal()
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index d8de26c15d..d332e3774a 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics}
import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
@@ -26,7 +25,6 @@ import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.ExecutorManager
trait BaseGlutenPartition extends Partition with InputPartition {
def plan: Array[Byte]
@@ -59,12 +57,10 @@ class GlutenWholeStageColumnarRDD(
updateNativeMetrics: IMetrics => Unit,
enableCudf: Boolean = false)
extends RDD[ColumnarBatch](sc, rdds.getDependencies) {
- private val numaBindingInfo = GlutenConfig.get.numaBindingInfo
override def compute(split: Partition, context: TaskContext):
Iterator[ColumnarBatch] = {
GlutenTimeMetric.millis(pipelineTime) {
_ =>
- ExecutorManager.tryTaskSet(numaBindingInfo)
val (inputPartition, inputColumnarRDDPartitions) =
castNativePartition(split)
val inputIterators = rdds.getIterators(inputColumnarRDDPartitions,
context)
BackendsApiManager.getIteratorApiInstance.genFirstStageIterator(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index bf0392dabc..0c5e1b58b1 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -447,7 +447,6 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
new WholeStageZippedPartitionsRDD(
sparkContext,
inputRDDs,
- GlutenConfig.get.numaBindingInfo,
sparkContext.getConf,
wsCtx,
pipelineTime,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
index 2c91057509..521388faae 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.config.GlutenNumaBindingInfo
import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics}
import org.apache.spark.{Partition, SparkConf, SparkContext, TaskContext}
@@ -33,7 +32,6 @@ private[gluten] class ZippedPartitionsPartition(
class WholeStageZippedPartitionsRDD(
@transient private val sc: SparkContext,
var rdds: ColumnarInputRDDsWrapper,
- numaBindingInfo: GlutenNumaBindingInfo,
sparkConf: SparkConf,
resCtx: WholeStageTransformContext,
pipelineTime: SQLMetric,
@@ -50,7 +48,6 @@ class WholeStageZippedPartitionsRDD(
.genFinalStageIterator(
context,
inputIterators,
- numaBindingInfo,
sparkConf,
resCtx.root,
pipelineTime,
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/util/ExecutorManager.scala
b/gluten-substrait/src/main/scala/org/apache/spark/util/ExecutorManager.scala
deleted file mode 100644
index 4e55aff0d3..0000000000
---
a/gluten-substrait/src/main/scala/org/apache/spark/util/ExecutorManager.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.util
-
-import org.apache.gluten.config.GlutenNumaBindingInfo
-
-import org.apache.spark.{SparkContext, SparkEnv}
-
-import java.lang.management.ManagementFactory
-
-object ExecutorManager {
- var isTaskSet: Boolean = false
-
- def getExecutorIds(sc: SparkContext): Seq[String] = sc.getExecutorIds
-
- def tryTaskSet(numaInfo: GlutenNumaBindingInfo): Any = synchronized {
- if (numaInfo.enableNumaBinding && !isTaskSet) {
- val cmd_output =
- Utils.executeAndGetOutput(
- Seq("bash", "-c", "ps -ef | grep YarnCoarseGrainedExecutorBackend"))
- val getExecutorId = """--executor-id (\d+)""".r
- val executorIdOnLocalNode = {
- val tmp = for (m <- getExecutorId.findAllMatchIn(cmd_output)) yield
m.group(1)
- tmp.toList.distinct
- }
- val executorId = SparkEnv.get.executorId
- val numCorePerExecutor = numaInfo.numCoresPerExecutor
- val coreRange = numaInfo.totalCoreRange
- val shouldBindNumaIdx = executorIdOnLocalNode.indexOf(executorId) %
coreRange.size
- // val coreStartIdx = coreRange(shouldBindNumaIdx)._1
- // val coreEndIdx = coreRange(shouldBindNumaIdx)._2
- // scalastyle:off println
- System.out.println(
- s"executorId is $executorId, executorIdOnLocalNode is
$executorIdOnLocalNode")
- val taskSetCmd = s"taskset -cpa ${coreRange(shouldBindNumaIdx)}
${getProcessId()}"
- System.out.println(taskSetCmd)
- // scalastyle:on println
- isTaskSet = true
- Utils.executeCommand(Seq("bash", "-c", taskSetCmd))
- }
- }
-
- def getProcessId(): Int = {
- val runtimeMXBean = ManagementFactory.getRuntimeMXBean()
- runtimeMXBean.getName().split("@")(0).toInt
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]