This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 58dee739 minor: refactor prepare_output so that it does not require an
ExecutionContext (#1194)
58dee739 is described below
commit 58dee739b6b8c3e7c6057e01c72307cdcff56ada
Author: Andy Grove <[email protected]>
AuthorDate: Sun Dec 22 12:25:35 2024 -0700
minor: refactor prepare_output so that it does not require an
ExecutionContext (#1194)
---
native/core/src/execution/jni_api.rs | 20 +++++++-------------
1 file changed, 7 insertions(+), 13 deletions(-)
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index eb73675b..2c1a55f4 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -231,7 +231,7 @@ fn prepare_output(
array_addrs: jlongArray,
schema_addrs: jlongArray,
output_batch: RecordBatch,
- exec_context: &mut ExecutionContext,
+ validate: bool,
) -> CometResult<jlong> {
let array_address_array = unsafe { JLongArray::from_raw(array_addrs) };
let num_cols = env.get_array_length(&array_address_array)? as usize;
@@ -255,7 +255,7 @@ fn prepare_output(
)));
}
- if exec_context.debug_native {
+ if validate {
// Validate the output arrays.
for array in results.iter() {
let array_data = array.to_data();
@@ -275,9 +275,6 @@ fn prepare_output(
i += 1;
}
- // Update metrics
- update_metrics(env, exec_context)?;
-
Ok(num_rows as jlong)
}
@@ -356,22 +353,22 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_executePlan(
let next_item = exec_context.stream.as_mut().unwrap().next();
let poll_output = exec_context.runtime.block_on(async {
poll!(next_item) });
+ // Update metrics
+ update_metrics(&mut env, exec_context)?;
+
match poll_output {
Poll::Ready(Some(output)) => {
+ // prepare output for FFI transfer
return prepare_output(
&mut env,
array_addrs,
schema_addrs,
output?,
- exec_context,
+ exec_context.debug_native,
);
}
Poll::Ready(None) => {
// Reaches EOF of output.
-
- // Update metrics
- update_metrics(&mut env, exec_context)?;
-
if exec_context.explain_native {
if let Some(plan) = &exec_context.root_op {
let formatted_plan_str =
@@ -391,9 +388,6 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_executePlan(
// A poll pending means there are more than one blocking
operators,
// we don't need go back-forth between JVM/Native. Just
keeping polling.
Poll::Pending => {
- // Update metrics
- update_metrics(&mut env, exec_context)?;
-
// Pull input batches
pull_input_batches(exec_context)?;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]