This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 7da2ba5d [AURON #2104] Migrate the Native dependency configuration
from SparkAuronConfiguration to AuronConfiguration (#2105)
7da2ba5d is described below
commit 7da2ba5d10113ede0706db4cc228e1d563e7faf3
Author: zhangmang <[email protected]>
AuthorDate: Thu Mar 19 17:02:29 2026 +0800
[AURON #2104] Migrate the Native dependency configuration from
SparkAuronConfiguration to AuronConfiguration (#2105)
# Which issue does this PR close?
Closes #2104
# Rationale for this change
Decoupling the Native Engine and Spark, migrate the configuration items
currently in the Spark module to the auron-core module
# What changes are included in this PR?
Migrate these configurations from SparkAuronConfiguration to
AuronConfiguration.
* TOKIO_WORKER_THREADS_PER_CPU
* SPARK_TASK_CPUS
* SUGGESTED_BATCH_MEM_SIZE
# Are there any user-facing changes?
* No
# How was this patch tested?
* No need test
---
.../auron/configuration/AuronConfiguration.java | 26 ++++++++++++++++++++++
native-engine/auron-jni-bridge/src/conf.rs | 2 +-
native-engine/auron/src/rt.rs | 6 ++---
.../configuration/SparkAuronConfiguration.java | 25 ---------------------
.../spark/sql/auron/SparkUDTFWrapperContext.scala | 3 +--
.../execution/auron/arrowio/ArrowFFIExporter.scala | 3 +--
6 files changed, 32 insertions(+), 33 deletions(-)
diff --git
a/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
b/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
index 31017d8b..a2bd08d4 100644
---
a/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
+++
b/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
@@ -39,6 +39,32 @@ public abstract class AuronConfiguration {
.withDescription("Log level for native execution.")
.withDefaultValue("info");
+ public static final ConfigOption<Integer> TOKIO_WORKER_THREADS_PER_CPU =
new ConfigOption<>(Integer.class)
+ .withKey("auron.tokio.worker.threads.per.cpu")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Number of Tokio worker threads to create per CPU core
(spark.task.cpus). Set to 0 for automatic detection "
+ + "based on available CPU cores. This setting
controls the thread pool size for Tokio-based asynchronous operations.")
+ .withDefaultValue(0);
+
+ public static final ConfigOption<Integer> SUGGESTED_BATCH_MEM_SIZE = new
ConfigOption<>(Integer.class)
+ .withKey("auron.suggested.batch.memSize")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Suggested memory size in bytes for record batches. This
setting controls the target memory allocation "
+ + "for individual data batches to optimize memory
usage and processing efficiency. Default is 8MB (8,388,608 bytes).")
+ .withDefaultValue(8388608);
+
+ public static final ConfigOption<Integer> TASK_CPUS = new
ConfigOption<>(Integer.class)
+ .withKey("task.cpus")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Number of CPU cores allocated per Spark task. This
setting determines the parallelism level "
+ + "for individual tasks and affects resource
allocation and task scheduling. "
+ + "In Spark, the value is retrieved from SparkEnv
via the 'spark.task.cpus' option; "
+ + "if not configured, the default value of 1 is
used.")
+ .withDefaultValue(1);
+
public abstract <T> Optional<T> getOptional(ConfigOption<T> option);
public <T> T get(ConfigOption<T> option) {
diff --git a/native-engine/auron-jni-bridge/src/conf.rs
b/native-engine/auron-jni-bridge/src/conf.rs
index 351432f9..fa6c1d57 100644
--- a/native-engine/auron-jni-bridge/src/conf.rs
+++ b/native-engine/auron-jni-bridge/src/conf.rs
@@ -47,7 +47,7 @@ define_conf!(IntConf, PARQUET_METADATA_CACHE_SIZE);
define_conf!(StringConf, SPARK_IO_COMPRESSION_CODEC);
define_conf!(IntConf, SPARK_IO_COMPRESSION_ZSTD_LEVEL);
define_conf!(IntConf, TOKIO_WORKER_THREADS_PER_CPU);
-define_conf!(IntConf, SPARK_TASK_CPUS);
+define_conf!(IntConf, TASK_CPUS);
define_conf!(IntConf, SHUFFLE_COMPRESSION_TARGET_BUF_SIZE);
define_conf!(StringConf, SPILL_COMPRESSION_CODEC);
define_conf!(BooleanConf, SMJ_FALLBACK_ENABLE);
diff --git a/native-engine/auron/src/rt.rs b/native-engine/auron/src/rt.rs
index 29a0f180..b3d4addd 100644
--- a/native-engine/auron/src/rt.rs
+++ b/native-engine/auron/src/rt.rs
@@ -29,7 +29,7 @@ use arrow::{
record_batch::RecordBatch,
};
use auron_jni_bridge::{
- conf::{IntConf, SPARK_TASK_CPUS, TOKIO_WORKER_THREADS_PER_CPU},
+ conf::{IntConf, TASK_CPUS, TOKIO_WORKER_THREADS_PER_CPU},
is_task_running, jni_call, jni_call_static, jni_convert_byte_array,
jni_exception_check,
jni_exception_occurred, jni_new_global_ref, jni_new_object, jni_new_string,
};
@@ -105,8 +105,8 @@ impl NativeExecutionRuntime {
let num_worker_threads = {
let worker_threads_per_cpu =
TOKIO_WORKER_THREADS_PER_CPU.value().unwrap_or(0);
- let spark_task_cpus = SPARK_TASK_CPUS.value().unwrap_or(0);
- worker_threads_per_cpu * spark_task_cpus
+ let task_cpus = TASK_CPUS.value().unwrap_or(0);
+ worker_threads_per_cpu * task_cpus
};
// create tokio runtime
diff --git
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
index 3cc1e534..a1bf8e3c 100644
---
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
+++
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -202,23 +202,6 @@ public class SparkAuronConfiguration extends
AuronConfiguration {
+ "but require more CPU time and memory.")
.withDynamicDefaultValue(_conf ->
SparkEnv.get().conf().getInt("spark.io.compression.zstd.level", 1));
- public static final ConfigOption<Integer> TOKIO_WORKER_THREADS_PER_CPU =
new ConfigOption<>(Integer.class)
- .withKey("auron.tokio.worker.threads.per.cpu")
- .withCategory("Runtime Configuration")
- .withDescription(
- "Number of Tokio worker threads to create per CPU core
(spark.task.cpus). Set to 0 for automatic detection "
- + "based on available CPU cores. This setting
controls the thread pool size for Tokio-based asynchronous operations.")
- .withDefaultValue(0);
-
- public static final ConfigOption<Integer> SPARK_TASK_CPUS = new
ConfigOption<>(Integer.class)
- .withKey("task.cpus")
- .withCategory("Runtime Configuration")
- .withDescription(
- "Number of CPU cores allocated per Spark task. This
setting determines the parallelism level "
- + "for individual tasks and affects resource
allocation and task scheduling. "
- + "Defaults to spark.task.cpus.")
- .withDynamicDefaultValue(_conf ->
SparkEnv.get().conf().getInt("spark.task.cpus", 1));
-
public static final ConfigOption<Boolean> FORCE_SHUFFLED_HASH_JOIN = new
ConfigOption<>(Boolean.class)
.withKey("auron.forceShuffledHashJoin")
.withCategory("Operator Supports")
@@ -276,14 +259,6 @@ public class SparkAuronConfiguration extends
AuronConfiguration {
+ "of the available on-heap memory can be used for
spilling data to disk when memory pressure occurs.")
.withDefaultValue(0.9);
- public static final ConfigOption<Integer> SUGGESTED_BATCH_MEM_SIZE = new
ConfigOption<>(Integer.class)
- .withKey("auron.suggested.batch.memSize")
- .withCategory("Runtime Configuration")
- .withDescription(
- "Suggested memory size in bytes for record batches. This
setting controls the target memory allocation "
- + "for individual data batches to optimize memory
usage and processing efficiency. Default is 8MB (8,388,608 bytes).")
- .withDefaultValue(8388608);
-
public static final ConfigOption<Boolean> PARSE_JSON_ERROR_FALLBACK = new
ConfigOption<>(Boolean.class)
.withKey("auron.parseJsonError.fallback")
.withCategory("Expression/Function Supports")
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
index e830d838..52199321 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDTFWrapperContext.scala
@@ -41,7 +41,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.auron.configuration.AuronConfiguration
import org.apache.auron.jni.AuronAdaptor
-import org.apache.auron.spark.configuration.SparkAuronConfiguration
case class SparkUDTFWrapperContext(serialized: ByteBuffer) extends Logging {
private val (expr, javaParamsSchema) =
@@ -66,7 +65,7 @@ case class SparkUDTFWrapperContext(serialized: ByteBuffer)
extends Logging {
AuronAdaptor.getInstance.getAuronConfiguration
private val batchSize =
sparkAuronConfig.getInteger(AuronConfiguration.BATCH_SIZE)
private val maxBatchMemorySize =
-
sparkAuronConfig.getInteger(SparkAuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
+ sparkAuronConfig.getInteger(AuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
private val dictionaryProvider: DictionaryProvider = new
MapDictionaryProvider()
private val javaOutputSchema = StructType(
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
index a23acebf..0192aa35 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.auron.arrowio.AuronArrowFFIExporter
import org.apache.auron.configuration.AuronConfiguration
import org.apache.auron.jni.AuronAdaptor
-import org.apache.auron.spark.configuration.SparkAuronConfiguration
class ArrowFFIExporter(rowIter: Iterator[InternalRow], schema: StructType)
extends AuronArrowFFIExporter
@@ -49,7 +48,7 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow],
schema: StructType)
AuronAdaptor.getInstance.getAuronConfiguration
private val maxBatchNumRows =
sparkAuronConfig.getInteger(AuronConfiguration.BATCH_SIZE)
private val maxBatchMemorySize =
-
sparkAuronConfig.getInteger(SparkAuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
+ sparkAuronConfig.getInteger(AuronConfiguration.SUGGESTED_BATCH_MEM_SIZE)
private val arrowSchema = ArrowUtils.toArrowSchema(schema)
private val emptyDictionaryProvider = new MapDictionaryProvider()