This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 5998f1cd [AURON #2102] Initializing JavaClasses in JNI and decoupling
Spark (#2103)
5998f1cd is described below
commit 5998f1cd630acc4881786d7309b362c86e51f549
Author: zhangmang <[email protected]>
AuthorDate: Wed Mar 18 20:16:57 2026 +0800
[AURON #2102] Initializing JavaClasses in JNI and decoupling Spark (#2103)
# Which issue does this PR close?
Closes #2102
# Rationale for this change
Many fields in JavaClasses are tightly coupled with Spark Java code; we
decide whether to load the relevant code based on the engine.
# What changes are included in this PR?
* Introduce getEngineName API for `JniBridge` and `AuronAdaptor`
* modify jni_bridge add engine type checking when initializing
JavaClasses
# Are there any user-facing changes?
* No
# How was this patch tested?
* No
---
.../java/org/apache/auron/jni/AuronAdaptor.java | 5 +
.../main/java/org/apache/auron/jni/JniBridge.java | 4 +
.../org/apache/auron/jni/MockAuronAdaptor.java | 5 +
.../org/apache/auron/jni/FlinkAuronAdaptor.java | 5 +
native-engine/auron-jni-bridge/src/jni_bridge.rs | 153 +++++++++++++++++----
native-engine/auron/src/metrics.rs | 8 +-
native-engine/auron/src/rt.rs | 4 +-
.../org/apache/auron/jni/SparkAuronAdaptor.java | 5 +
8 files changed, 160 insertions(+), 29 deletions(-)
diff --git a/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
b/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
index 463fb727..0162bef5 100644
--- a/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
+++ b/auron-core/src/main/java/org/apache/auron/jni/AuronAdaptor.java
@@ -125,4 +125,9 @@ public abstract class AuronAdaptor {
* @throws UnsupportedOperationException If the method is not implemented.
*/
public abstract AuronUDFWrapperContext
getAuronUDFWrapperContext(ByteBuffer udfSerialized);
+
+ /**
+ * Returns the name of the current engine, such as Spark or Flink.
+ */
+ public abstract String getEngineName();
}
diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
index 121970c2..d0853608 100644
--- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
+++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
@@ -130,6 +130,10 @@ public class JniBridge {
return getConfValue(confKey);
}
+ public static String getEngineName() {
+ return AuronAdaptor.getInstance().getEngineName();
+ }
+
static <T> T getConfValue(String confKey) {
Class<? extends AuronConfiguration> confClass =
AuronAdaptor.getInstance().getAuronConfiguration().getClass();
diff --git
a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
b/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
index 6cb2407f..61022941 100644
--- a/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
+++ b/auron-core/src/test/java/org/apache/auron/jni/MockAuronAdaptor.java
@@ -59,4 +59,9 @@ public class MockAuronAdaptor extends AuronAdaptor {
public AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer
udfSerialized) {
return new MockAuronUDFWrapperContext(udfSerialized);
}
+
+ @Override
+ public String getEngineName() {
+ return "Test";
+ }
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/jni/FlinkAuronAdaptor.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/jni/FlinkAuronAdaptor.java
index 11a40498..ba95f93a 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/jni/FlinkAuronAdaptor.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/jni/FlinkAuronAdaptor.java
@@ -72,4 +72,9 @@ public class FlinkAuronAdaptor extends AuronAdaptor {
public AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer
byteBuffer) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public String getEngineName() {
+ return "Flink";
+ }
}
diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs
b/native-engine/auron-jni-bridge/src/jni_bridge.rs
index f058901a..a98ae7f5 100644
--- a/native-engine/auron-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs
@@ -442,21 +442,21 @@ pub struct JavaClasses<'a> {
pub cSparkFileSegment: SparkFileSegment<'a>,
pub cSparkSQLMetric: SparkSQLMetric<'a>,
- pub cSparkMetricNode: SparkMetricNode<'a>,
pub cSparkAuronUDFWrapperContext: SparkAuronUDFWrapperContext<'a>,
pub cSparkUDAFWrapperContext: SparkUDAFWrapperContext<'a>,
pub cSparkUDTFWrapperContext: SparkUDTFWrapperContext<'a>,
pub cSparkUDAFMemTracker: SparkUDAFMemTracker<'a>,
pub cAuronRssPartitionWriterBase: AuronRssPartitionWriterBase<'a>,
- pub cAuronCallNativeWrapper: AuronCallNativeWrapper<'a>,
pub cAuronOnHeapSpillManager: AuronOnHeapSpillManager<'a>,
pub cAuronNativeParquetSinkUtils: AuronNativeParquetSinkUtils<'a>,
pub cAuronBlockObject: AuronBlockObject<'a>,
+ pub cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper<'a>,
+
pub cAuronArrowFFIExporter: AuronArrowFFIExporter<'a>,
+ pub cAuronCallNativeWrapper: AuronCallNativeWrapper<'a>,
pub cAuronFSDataInputWrapper: AuronFSDataInputWrapper<'a>,
pub cAuronFSDataOutputWrapper: AuronFSDataOutputWrapper<'a>,
- pub cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper<'a>,
-
+ pub cMetricNode: MetricNode<'a>,
pub cAuronUDFWrapperContext: AuronUDFWrapperContext<'a>,
}
@@ -481,6 +481,61 @@ impl JavaClasses<'static> {
)?
.l()?;
+ let engine_name_java = env
+ .call_static_method_unchecked(
+ jni_bridge.class,
+ jni_bridge.method_getEngineName,
+ jni_bridge.method_getEngineName_ret.clone(),
+ &[],
+ )?
+ .l()?;
+ let engine_name = env
+ .get_string(engine_name_java.into())
+ .map(|s| String::from(s))
+ .expect("engine_name is not valid");
+ log::info!("Runtime engine is {engine_name}");
+
+ let (
+ c_spark_file_segment,
+ c_spark_sql_metric,
+ c_spark_auron_udf_wrapper_context,
+ c_spark_udaf_wrapper_context,
+ c_spark_udtf_wrapper_context,
+ c_spark_udaf_mem_tracker,
+ c_auron_rss_partition_writer_base,
+ c_auron_on_heap_spill_manager,
+ c_auron_native_parquet_sink_utils,
+ c_auron_block_object,
+ c_auron_json_fallback_wrapper,
+ ) = match engine_name.as_str() {
+ "Spark" => (
+ SparkFileSegment::new(env)?,
+ SparkSQLMetric::new(env)?,
+ SparkAuronUDFWrapperContext::new(env)?,
+ SparkUDAFWrapperContext::new(env)?,
+ SparkUDTFWrapperContext::new(env)?,
+ SparkUDAFMemTracker::new(env)?,
+ AuronRssPartitionWriterBase::new(env)?,
+ AuronOnHeapSpillManager::new(env)?,
+ AuronNativeParquetSinkUtils::new(env)?,
+ AuronBlockObject::new(env)?,
+ AuronJsonFallbackWrapper::new(env)?,
+ ),
+ _ => (
+ SparkFileSegment::default(),
+ SparkSQLMetric::default(),
+ SparkAuronUDFWrapperContext::default(),
+ SparkUDAFWrapperContext::default(),
+ SparkUDTFWrapperContext::default(),
+ SparkUDAFMemTracker::default(),
+ AuronRssPartitionWriterBase::default(),
+ AuronOnHeapSpillManager::default(),
+ AuronNativeParquetSinkUtils::default(),
+ AuronBlockObject::default(),
+ AuronJsonFallbackWrapper::default(),
+ ),
+ };
+
let java_classes = JavaClasses {
jvm: env.get_java_vm()?,
classloader: get_global_ref_jobject(env, classloader)?,
@@ -505,23 +560,23 @@ impl JavaClasses<'static> {
cHadoopFileSystem: HadoopFileSystem::new(env)?,
cHadoopPath: HadoopPath::new(env)?,
- cSparkFileSegment: SparkFileSegment::new(env)?,
- cSparkSQLMetric: SparkSQLMetric::new(env)?,
- cSparkMetricNode: SparkMetricNode::new(env)?,
- cSparkAuronUDFWrapperContext:
SparkAuronUDFWrapperContext::new(env)?,
- cSparkUDAFWrapperContext: SparkUDAFWrapperContext::new(env)?,
- cSparkUDTFWrapperContext: SparkUDTFWrapperContext::new(env)?,
- cSparkUDAFMemTracker: SparkUDAFMemTracker::new(env)?,
- cAuronRssPartitionWriterBase:
AuronRssPartitionWriterBase::new(env)?,
- cAuronCallNativeWrapper: AuronCallNativeWrapper::new(env)?,
- cAuronOnHeapSpillManager: AuronOnHeapSpillManager::new(env)?,
- cAuronNativeParquetSinkUtils:
AuronNativeParquetSinkUtils::new(env)?,
- cAuronBlockObject: AuronBlockObject::new(env)?,
+ cSparkFileSegment: c_spark_file_segment,
+ cSparkSQLMetric: c_spark_sql_metric,
+ cSparkAuronUDFWrapperContext:
c_spark_auron_udf_wrapper_context,
+ cSparkUDAFWrapperContext: c_spark_udaf_wrapper_context,
+ cSparkUDTFWrapperContext: c_spark_udtf_wrapper_context,
+ cSparkUDAFMemTracker: c_spark_udaf_mem_tracker,
+ cAuronRssPartitionWriterBase:
c_auron_rss_partition_writer_base,
+ cAuronOnHeapSpillManager: c_auron_on_heap_spill_manager,
+ cAuronNativeParquetSinkUtils:
c_auron_native_parquet_sink_utils,
+ cAuronBlockObject: c_auron_block_object,
+ cAuronJsonFallbackWrapper: c_auron_json_fallback_wrapper,
+
cAuronArrowFFIExporter: AuronArrowFFIExporter::new(env)?,
+ cAuronCallNativeWrapper: AuronCallNativeWrapper::new(env)?,
cAuronFSDataInputWrapper: AuronFSDataInputWrapper::new(env)?,
cAuronFSDataOutputWrapper: AuronFSDataOutputWrapper::new(env)?,
- cAuronJsonFallbackWrapper: AuronJsonFallbackWrapper::new(env)?,
-
+ cMetricNode: MetricNode::new(env)?,
cAuronUDFWrapperContext: AuronUDFWrapperContext::new(env)?,
};
log::info!("Initializing JavaClasses finished");
@@ -587,6 +642,8 @@ pub struct JniBridge<'a> {
pub method_booleanConf_ret: ReturnType,
pub method_stringConf: JStaticMethodID,
pub method_stringConf_ret: ReturnType,
+ pub method_getEngineName: JStaticMethodID,
+ pub method_getEngineName_ret: ReturnType,
}
impl<'a> JniBridge<'a> {
pub const SIG_TYPE: &'static str = "org/apache/auron/jni/JniBridge";
@@ -700,6 +757,12 @@ impl<'a> JniBridge<'a> {
"(Ljava/lang/String;)Ljava/lang/String;",
)?,
method_stringConf_ret: ReturnType::Object,
+ method_getEngineName: env.get_static_method_id(
+ class,
+ "getEngineName",
+ "()Ljava/lang/String;",
+ )?,
+ method_getEngineName_ret: ReturnType::Object,
})
}
}
@@ -1110,6 +1173,10 @@ impl<'a> SparkFileSegment<'a> {
method_length_ret: ReturnType::Primitive(Primitive::Long),
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
#[allow(non_snake_case)]
@@ -1129,22 +1196,26 @@ impl<'a> SparkSQLMetric<'a> {
method_add_ret: ReturnType::Primitive(Primitive::Void),
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
#[allow(non_snake_case)]
-pub struct SparkMetricNode<'a> {
+pub struct MetricNode<'a> {
pub class: JClass<'a>,
pub method_getChild: JMethodID,
pub method_getChild_ret: ReturnType,
pub method_add: JMethodID,
pub method_add_ret: ReturnType,
}
-impl<'a> SparkMetricNode<'a> {
- pub const SIG_TYPE: &'static str =
"org/apache/auron/metric/SparkMetricNode";
+impl<'a> MetricNode<'a> {
+ pub const SIG_TYPE: &'static str = "org/apache/auron/metric/MetricNode";
- pub fn new(env: &JNIEnv<'a>) -> JniResult<SparkMetricNode<'a>> {
+ pub fn new(env: &JNIEnv<'a>) -> JniResult<MetricNode<'a>> {
let class = get_global_jclass(env, Self::SIG_TYPE)?;
- Ok(SparkMetricNode {
+ Ok(MetricNode {
class,
method_getChild: env.get_method_id(
class,
@@ -1181,6 +1252,10 @@ impl<'a> AuronRssPartitionWriterBase<'_> {
method_flush_ret: ReturnType::Primitive(Primitive::Void),
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
#[allow(non_snake_case)]
@@ -1202,6 +1277,10 @@ impl<'a> SparkAuronUDFWrapperContext<'a> {
method_eval_ret: ReturnType::Primitive(Primitive::Void),
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
#[allow(non_snake_case)]
@@ -1318,6 +1397,10 @@ impl<'a> SparkUDAFWrapperContext<'a> {
method_unspill_ret: ReturnType::Object,
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
#[allow(non_snake_case)]
@@ -1347,6 +1430,10 @@ impl<'a> SparkUDTFWrapperContext<'a> {
method_terminateLoop_ret: ReturnType::Primitive(Primitive::Void),
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
#[allow(non_snake_case)]
@@ -1380,6 +1467,10 @@ impl<'a> SparkUDAFMemTracker<'a> {
method_updateUsed_ret: ReturnType::Primitive(Primitive::Boolean),
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
#[allow(non_snake_case)]
@@ -1470,6 +1561,10 @@ impl<'a> AuronOnHeapSpillManager<'a> {
method_releaseSpill_ret: ReturnType::Primitive(Primitive::Void),
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
#[allow(non_snake_case)]
@@ -1502,6 +1597,10 @@ impl<'a> AuronNativeParquetSinkUtils<'a> {
method_completeOutput_ret: ReturnType::Primitive(Primitive::Void),
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
#[allow(non_snake_case)]
@@ -1562,6 +1661,10 @@ impl<'a> AuronBlockObject<'a> {
method_throwFetchFailed_ret:
ReturnType::Primitive(Primitive::Void),
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
#[allow(non_snake_case)]
@@ -1648,6 +1751,10 @@ impl<'a> AuronJsonFallbackWrapper<'a> {
method_parseJsons_ret: ReturnType::Primitive(Primitive::Void),
})
}
+
+ fn default() -> Self {
+ unsafe { std::mem::zeroed() }
+ }
}
fn get_global_jclass(env: &JNIEnv<'_>, cls: &str) ->
JniResult<JClass<'static>> {
diff --git a/native-engine/auron/src/metrics.rs
b/native-engine/auron/src/metrics.rs
index e52ee4bb..30d957a3 100644
--- a/native-engine/auron/src/metrics.rs
+++ b/native-engine/auron/src/metrics.rs
@@ -19,7 +19,7 @@ use auron_jni_bridge::{jni_call, jni_new_string};
use datafusion::{common::Result, physical_plan::ExecutionPlan};
use jni::objects::JObject;
-pub fn update_spark_metric_node(
+pub fn update_metric_node(
metric_node: JObject,
execution_plan: Arc<dyn ExecutionPlan>,
) -> Result<()> {
@@ -42,9 +42,9 @@ pub fn update_spark_metric_node(
// update children nodes
for (i, &child_plan) in execution_plan.children().iter().enumerate() {
let child_metric_node = jni_call!(
- SparkMetricNode(metric_node).getChild(i as i32) -> JObject
+ MetricNode(metric_node).getChild(i as i32) -> JObject
)?;
- update_spark_metric_node(child_metric_node.as_obj(),
child_plan.clone())?;
+ update_metric_node(child_metric_node.as_obj(), child_plan.clone())?;
}
Ok(())
}
@@ -52,7 +52,7 @@ pub fn update_spark_metric_node(
fn update_metrics(metric_node: JObject, metric_values: &[(&str, i64)]) ->
Result<()> {
for &(name, value) in metric_values {
let jname = jni_new_string!(&name)?;
- jni_call!(SparkMetricNode(metric_node).add(jname.as_obj(), value) ->
())?;
+ jni_call!(MetricNode(metric_node).add(jname.as_obj(), value) -> ())?;
}
Ok(())
}
diff --git a/native-engine/auron/src/rt.rs b/native-engine/auron/src/rt.rs
index 005aac1d..29a0f180 100644
--- a/native-engine/auron/src/rt.rs
+++ b/native-engine/auron/src/rt.rs
@@ -57,7 +57,7 @@ use tokio::{runtime::Runtime, task::JoinHandle};
use crate::{
handle_unwinded_scope,
logging::{THREAD_PARTITION_ID, THREAD_STAGE_ID, THREAD_TID},
- metrics::update_spark_metric_node,
+ metrics::update_metric_node,
};
pub struct NativeExecutionRuntime {
@@ -301,7 +301,7 @@ impl NativeExecutionRuntime {
let metrics = jni_call!(
AuronCallNativeWrapper(self.native_wrapper.as_obj()).getMetrics()
-> JObject
)?;
- update_spark_metric_node(metrics.as_obj(), self.plan.clone())?;
+ update_metric_node(metrics.as_obj(), self.plan.clone())?;
Ok(())
}
}
diff --git
a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
index 005e9753..3bf45cb5 100644
--- a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
+++ b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
@@ -94,4 +94,9 @@ public class SparkAuronAdaptor extends AuronAdaptor {
public AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer
udfSerialized) {
return new SparkAuronUDFWrapperContext(udfSerialized);
}
+
+ @Override
+ public String getEngineName() {
+ return "Spark";
+ }
}