This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 7da41e13 [AURON #1957] Fix panic/coredumps/memory leaks and Java 
thread InterruptedException errors. (#1980)
7da41e13 is described below

commit 7da41e132f0144900b013e03f95f63c3609a84cd
Author: Bryton Lee <[email protected]>
AuthorDate: Mon Mar 9 15:07:06 2026 +0800

    [AURON #1957] Fix panic/coredumps/memory leaks and Java thread 
InterruptedException errors. (#1980)
    
    # Which issue does this PR close?
    
    Closes #1957
    
    # Rationale for this change
    
    Optimized rt.rs to prevent SendError panic to gracefully finalize a
    native plan execution.
    
    coredumps and memory leaks are all introdced a memory leak issue inside
    ffi_reader_exec.rs, when a stream is droped, the Java side ffi import
    has a race condition to write memory that was drop in rust side.
    
    Optimized ArrowFFIExporter.scala close() logic to gracefully close
    outputThread without sending errors.
    
    # What changes are included in this PR?
    Changed rt.rs, ffi_reader_exec.rs and ArrowFFIExporter.scala.
    
    # Are there any user-facing changes?
    No
    
    # How was this patch tested?
    It was tested on the latest master branch and  some earlier versions.
---
 native-engine/auron/src/lib.rs                     | 11 ++-
 native-engine/auron/src/rt.rs                      | 39 ++++++++--
 .../datafusion-ext-plans/src/ffi_reader_exec.rs    | 32 +++++---
 .../execution/auron/arrowio/ArrowFFIExporter.scala | 85 ++++++++++++++++------
 4 files changed, 125 insertions(+), 42 deletions(-)

diff --git a/native-engine/auron/src/lib.rs b/native-engine/auron/src/lib.rs
index 2dfb336e..aaf71ee8 100644
--- a/native-engine/auron/src/lib.rs
+++ b/native-engine/auron/src/lib.rs
@@ -55,8 +55,15 @@ fn handle_unwinded(err: Box<dyn Any + Send>) {
 }
 
 fn handle_unwinded_scope<T: Default, E: Debug>(scope: impl FnOnce() -> 
Result<T, E>) -> T {
-    match std::panic::catch_unwind(AssertUnwindSafe(|| scope().expect("scope 
failed"))) {
-        Ok(v) => v,
+    match std::panic::catch_unwind(AssertUnwindSafe(|| scope())) {
+        Ok(Ok(v)) => v,
+        Ok(Err(err)) => {
+            // Defensive handling: this path should not be reached in normal 
operation
+            // after the SendError fixes (is_finalizing flag, FFI_ArrowArray 
lifetime).
+            // If triggered, it indicates a new issue that needs investigation.
+            log::error!("error in unwinded scope: {err:?}");
+            T::default()
+        }
         Err(err) => {
             handle_unwinded(err);
             T::default()
diff --git a/native-engine/auron/src/rt.rs b/native-engine/auron/src/rt.rs
index 37f40daf..005aac1d 100644
--- a/native-engine/auron/src/rt.rs
+++ b/native-engine/auron/src/rt.rs
@@ -16,7 +16,11 @@
 use std::{
     error::Error,
     panic::AssertUnwindSafe,
-    sync::{Arc, mpsc::Receiver},
+    sync::{
+        Arc,
+        atomic::{AtomicBool, Ordering},
+        mpsc::Receiver,
+    },
 };
 
 use arrow::{
@@ -63,6 +67,8 @@ pub struct NativeExecutionRuntime {
     batch_receiver: Receiver<Result<Option<RecordBatch>>>,
     tokio_runtime: Runtime,
     join_handle: JoinHandle<()>,
+    // Flag to indicate runtime is being finalized - used to gracefully handle 
SendError
+    is_finalizing: Arc<AtomicBool>,
 }
 
 impl NativeExecutionRuntime {
@@ -134,6 +140,8 @@ impl NativeExecutionRuntime {
         // spawn batch producer
         let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);
         let err_sender = batch_sender.clone();
+        let is_finalizing = Arc::new(AtomicBool::new(false));
+        let is_finalizing_clone = is_finalizing.clone();
         let execution_plan_cloned = execution_plan.clone();
         let exec_ctx_cloned = exec_ctx.clone();
         let native_wrapper_cloned = native_wrapper.clone();
@@ -173,14 +181,24 @@ impl NativeExecutionRuntime {
                 .transpose()
                 .or_else(|err| df_execution_err!("{err}"))?
             {
-                batch_sender
-                    .send(Ok(Some(batch)))
-                    .or_else(|err| df_execution_err!("send batch error: 
{err}"))?;
+                if let Err(err) = batch_sender.send(Ok(Some(batch))) {
+                    if is_finalizing_clone.load(Ordering::Acquire) {
+                        log::debug!("send skipped: runtime is finalizing");
+                        return Ok(());
+                    } else {
+                        return df_execution_err!("unexpected send error: 
{err}");
+                    }
+                }
+            }
+            log::info!("stream exhausted, sending Ok(None) to signal 
completion");
+            if let Err(err) = batch_sender.send(Ok(None)) {
+                if is_finalizing_clone.load(Ordering::Acquire) {
+                    log::debug!("send skipped: runtime is finalizing");
+                    return Ok(());
+                } else {
+                    return df_execution_err!("unexpected send error: {err}");
+                }
             }
-            batch_sender
-                .send(Ok(None))
-                .or_else(|err| df_execution_err!("send batch error: {err}"))?;
-            log::info!("task finished");
             Ok::<_, DataFusionError>(())
         };
 
@@ -224,6 +242,7 @@ impl NativeExecutionRuntime {
             tokio_runtime,
             batch_receiver,
             join_handle,
+            is_finalizing,
         };
         Ok(native_execution_runtime)
     }
@@ -266,6 +285,10 @@ impl NativeExecutionRuntime {
         log::info!("(partition={partition}) native execution finalizing");
         self.update_metrics().unwrap_or_default();
         drop(self.plan);
+
+        // Set finalizing flag before dropping receiver to allow graceful 
SendError
+        // handling
+        self.is_finalizing.store(true, Ordering::Release);
         drop(self.batch_receiver);
 
         cancel_all_tasks(&self.exec_ctx.task_ctx()); // cancel all pending 
streams
diff --git a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs 
b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
index 1d579234..81097c9e 100644
--- a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
@@ -26,7 +26,7 @@ use arrow::{
 };
 use auron_jni_bridge::{jni_call, jni_call_static, jni_new_global_ref, 
jni_new_string};
 use datafusion::{
-    error::Result,
+    error::{DataFusionError, Result},
     execution::context::TaskContext,
     physical_expr::EquivalenceProperties,
     physical_plan::{
@@ -37,7 +37,7 @@ use datafusion::{
         metrics::{ExecutionPlanMetricsSet, MetricsSet},
     },
 };
-use datafusion_ext_commons::arrow::array_size::BatchSize;
+use datafusion_ext_commons::{arrow::array_size::BatchSize, df_execution_err};
 use jni::objects::GlobalRef;
 use once_cell::sync::OnceCell;
 
@@ -192,27 +192,41 @@ fn read_ffi(
             struct AutoCloseableExporter(GlobalRef);
             impl Drop for AutoCloseableExporter {
                 fn drop(&mut self) {
-                    let _ = 
jni_call!(JavaAutoCloseable(self.0.as_obj()).close() -> ());
+                    if let Err(e) = 
jni_call!(JavaAutoCloseable(self.0.as_obj()).close() -> ()) {
+                        log::error!("FFIReader: JNI close() failed: {e:?}");
+                    }
                 }
             }
             let exporter = AutoCloseableExporter(exporter);
+            log::info!("FFIReader: starting to read from ArrowFFIExporter");
 
             loop {
                 let batch = {
                     // load batch from ffi
-                    let mut ffi_arrow_array = FFI_ArrowArray::empty();
-                    let ffi_arrow_array_ptr = &mut ffi_arrow_array as *mut 
FFI_ArrowArray as i64;
+                    // IMPORTANT: FFI_ArrowArray is created inside 
spawn_blocking to ensure its
+                    // lifetime is tied to the blocking task. This prevents 
data races if the
+                    // async task is aborted while spawn_blocking is still 
running.
                     let exporter_obj = exporter.0.clone();
-                    let has_next = tokio::task::spawn_blocking(move || {
-                        jni_call!(
+                    let ffi_result = match tokio::task::spawn_blocking(move || 
{
+                        let mut ffi_arrow_array = FFI_ArrowArray::empty();
+                        let ffi_arrow_array_ptr =
+                            &mut ffi_arrow_array as *mut FFI_ArrowArray as i64;
+                        let has_next = jni_call!(
                             AuronArrowFFIExporter(exporter_obj.as_obj())
                                 .exportNextBatch(ffi_arrow_array_ptr) -> bool
-                        )
+                        )?;
+                        Ok::<_, DataFusionError>((has_next, ffi_arrow_array))
                     })
                     .await
-                    .expect("tokio spawn_blocking error")?;
+                    {
+                        Ok(Ok(result)) => result,
+                        Ok(Err(err)) => return Err(err),
+                        Err(err) => return df_execution_err!("spawn_blocking 
error: {err:?}"),
+                    };
 
+                    let (has_next, ffi_arrow_array) = ffi_result;
                     if !has_next {
+                        log::info!("FFIReader: no more batches, exiting read 
loop");
                         break;
                     }
                     let import_data_type = 
DataType::Struct(schema.fields().clone());
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
index 591ae266..a23acebf 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
@@ -27,6 +27,7 @@ import org.apache.arrow.c.Data
 import org.apache.arrow.vector.VectorSchemaRoot
 import 
org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider
 import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.util.Using
 import org.apache.spark.sql.catalyst.InternalRow
@@ -42,7 +43,8 @@ import org.apache.auron.jni.AuronAdaptor
 import org.apache.auron.spark.configuration.SparkAuronConfiguration
 
 class ArrowFFIExporter(rowIter: Iterator[InternalRow], schema: StructType)
-    extends AuronArrowFFIExporter {
+    extends AuronArrowFFIExporter
+    with Logging {
   private val sparkAuronConfig: AuronConfiguration =
     AuronAdaptor.getInstance.getAuronConfiguration
   private val maxBatchNumRows = 
sparkAuronConfig.getInteger(AuronConfiguration.BATCH_SIZE)
@@ -58,6 +60,13 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow], 
schema: StructType)
   private case class Finished(t: Option[Throwable]) extends QueueState
 
   private val tc = TaskContext.get()
+  // Build a meaningful identifier from TaskContext info
+  private val exporterId = if (tc != null) {
+    
s"stage-${tc.stageId()}-part-${tc.partitionId()}-tid-${tc.taskAttemptId()}-${System.identityHashCode(this)}"
+  } else {
+    s"no-context-${System.identityHashCode(this)}"
+  }
+  private val closed = new java.util.concurrent.atomic.AtomicBoolean(false)
   private val outputQueue: BlockingQueue[QueueState] = new 
ArrayBlockingQueue[QueueState](16)
   private val processingQueue: BlockingQueue[Unit] = new 
ArrayBlockingQueue[Unit](16)
   private var currentRoot: VectorSchemaRoot = _
@@ -108,32 +117,40 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow], 
schema: StructType)
 
         nativeCurrentUser.doAs(new PrivilegedExceptionAction[Unit] {
           override def run(): Unit = {
-            while (tc == null || (!tc.isCompleted() && !tc.isInterrupted())) {
-              if (!rowIter.hasNext) {
-                outputQueue.put(Finished(None))
-                return
-              }
+            try {
+              while (tc == null || (!tc.isCompleted() && !tc.isInterrupted())) 
{
+                if (!rowIter.hasNext) {
+                  outputQueue.put(Finished(None))
+                  return
+                }
 
-              Using.resource(CHILD_ALLOCATOR("ArrowFFIExporter")) { allocator 
=>
-                Using.resource(VectorSchemaRoot.create(arrowSchema, 
allocator)) { root =>
-                  val arrowWriter = ArrowWriter.create(root)
-                  while (rowIter.hasNext
-                    && allocator.getAllocatedMemory < maxBatchMemorySize
-                    && arrowWriter.currentCount < maxBatchNumRows) {
-                    arrowWriter.write(rowIter.next())
+                Using.resource(CHILD_ALLOCATOR("ArrowFFIExporter")) { 
allocator =>
+                  Using.resource(VectorSchemaRoot.create(arrowSchema, 
allocator)) { root =>
+                    val arrowWriter = ArrowWriter.create(root)
+                    while (rowIter.hasNext
+                      && allocator.getAllocatedMemory < maxBatchMemorySize
+                      && arrowWriter.currentCount < maxBatchNumRows) {
+                      arrowWriter.write(rowIter.next())
+                    }
+                    arrowWriter.finish()
+
+                    // export root
+                    currentRoot = root
+                    outputQueue.put(NextBatch)
+
+                    // wait for processing next batch
+                    processingQueue.take()
                   }
-                  arrowWriter.finish()
-
-                  // export root
-                  currentRoot = root
-                  outputQueue.put(NextBatch)
-
-                  // wait for processing next batch
-                  processingQueue.take()
                 }
               }
+              outputQueue.put(Finished(None))
+            } catch {
+              case _: InterruptedException =>
+                // Thread was interrupted during close(), this is expected - 
just exit gracefully
+                logDebug(s"ArrowFFIExporter-$exporterId: outputThread 
interrupted, exiting")
+                outputQueue.clear()
+                outputQueue.put(Finished(None))
             }
-            outputQueue.put(Finished(None))
           }
         })
       }
@@ -156,6 +173,28 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow], 
schema: StructType)
   }
 
   override def close(): Unit = {
-    outputThread.interrupt()
+    // Ensure close() is idempotent - only execute once
+    if (!closed.compareAndSet(false, true)) {
+      logDebug(s"ArrowFFIExporter-$exporterId: close() already called, 
skipping")
+      return
+    }
+
+    if (outputThread.isAlive) {
+      logDebug(s"ArrowFFIExporter-$exporterId: interrupting outputThread")
+      outputThread.interrupt()
+      // Wait for the thread to terminate to ensure resources are properly 
released
+      try {
+        outputThread.join(5000) // Wait up to 5 seconds
+        if (outputThread.isAlive) {
+          logWarning(
+            s"ArrowFFIExporter-$exporterId: outputThread did not terminate 
within 5 seconds")
+        }
+      } catch {
+        case _: InterruptedException =>
+          // Ignore - we don't need to propagate this to caller
+          logDebug(s"ArrowFFIExporter-$exporterId: interrupted while waiting 
for outputThread")
+      }
+      logDebug(s"ArrowFFIExporter-$exporterId: close() completed")
+    }
   }
 }

Reply via email to