alamb commented on code in PR #20451: URL: https://github.com/apache/datafusion/pull/20451#discussion_r2976808302
########## datafusion/physical-optimizer/src/optimizer.rs: ########## @@ -50,7 +50,7 @@ use datafusion_physical_plan::ExecutionPlan; /// `PhysicalOptimizerRule`s. /// /// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule -pub trait PhysicalOptimizerRule: Debug { +pub trait PhysicalOptimizerRule: Debug + std::any::Any { Review Comment: We should probably call this PR out as a breaking API change as now all PhysicalOptimizerRules need to implement Any. Also, I am not sure it matters, but this isn't consistent with how the other traits in this crate use `Any`. The others add an `as_any()` function. for example: https://github.com/apache/datafusion/blob/0808f3a8d2646c9435557db059759653c3f2c383/datafusion/execution/src/cache/cache_manager.rs#L210-L212 Adding a consistent `as_any` method I think is still a breaking API change and I think constrains any implementation the same way, but I do think it would be better to stay consistent with the rest of DataFusion ########## datafusion/physical-optimizer/src/optimizer.rs: ########## @@ -50,7 +50,7 @@ use datafusion_physical_plan::ExecutionPlan; /// `PhysicalOptimizerRule`s. /// /// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule -pub trait PhysicalOptimizerRule: Debug { +pub trait PhysicalOptimizerRule: Debug + std::any::Any { Review Comment: Though with some research it seems like this method might be nicer than the `as_any` methods we use elsewhere as it would be less likely that downstream crates need to be changed 🤔 (to add the `as_any`) ########## datafusion/ffi/src/tests/physical_optimizer.rs: ########## @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::error::Result; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::limit::GlobalLimitExec; + +use crate::physical_optimizer::FFI_PhysicalOptimizerRule; + +/// A rule that wraps the input plan in a GlobalLimitExec with skip=0, fetch=10. Review Comment: 👍 ########## datafusion/ffi/src/physical_optimizer.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ffi::c_void; +use std::sync::Arc; + +use abi_stable::StableAbi; +use abi_stable::std_types::{RResult, RStr}; +use async_trait::async_trait; +use datafusion_common::config::ConfigOptions; +use datafusion_common::error::Result; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::ExecutionPlan; +use tokio::runtime::Handle; + +use crate::config::FFI_ConfigOptions; +use crate::execution_plan::FFI_ExecutionPlan; +use crate::util::FFIResult; +use crate::{df_result, rresult_return}; + +/// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +pub struct FFI_PhysicalOptimizerRule { + pub optimize: unsafe extern "C" fn( + &Self, + plan: &FFI_ExecutionPlan, + config: FFI_ConfigOptions, + ) -> FFIResult<FFI_ExecutionPlan>, + + pub name: unsafe extern "C" fn(&Self) -> RStr, + + pub schema_check: unsafe extern "C" fn(&Self) -> bool, + + /// Used to create a clone on the provider of the execution plan. This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Return the major DataFusion version number of this provider. + pub version: unsafe extern "C" fn() -> u64, + + /// Internal data. This is only to be accessed by the provider of the plan. + /// A [`ForeignPhysicalOptimizerRule`] should never attempt to access this data. + pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> usize, +} + +unsafe impl Send for FFI_PhysicalOptimizerRule {} +unsafe impl Sync for FFI_PhysicalOptimizerRule {} + +struct RulePrivateData { + rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>, + runtime: Option<Handle>, +} + +impl FFI_PhysicalOptimizerRule { + fn inner(&self) -> &Arc<dyn PhysicalOptimizerRule + Send + Sync> { + let private_data = self.private_data as *const RulePrivateData; + unsafe { &(*private_data).rule } + } + + fn runtime(&self) -> Option<Handle> { + let private_data = self.private_data as *const RulePrivateData; + unsafe { (*private_data).runtime.clone() } + } +} + +unsafe extern "C" fn optimize_fn_wrapper( + rule: &FFI_PhysicalOptimizerRule, + plan: &FFI_ExecutionPlan, + config: FFI_ConfigOptions, +) -> FFIResult<FFI_ExecutionPlan> { + let runtime = rule.runtime(); + let rule = rule.inner(); + let plan: Arc<dyn ExecutionPlan> = rresult_return!(plan.try_into()); + let config = rresult_return!(ConfigOptions::try_from(config)); + let optimized_plan = rresult_return!(rule.optimize(plan, &config)); + + RResult::ROk(FFI_ExecutionPlan::new(optimized_plan, runtime)) +} + +unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> RStr<'_> { + let rule = rule.inner(); + rule.name().into() +} + +unsafe extern "C" fn schema_check_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> bool { + rule.inner().schema_check() +} + +unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_PhysicalOptimizerRule) { + let private_data = + unsafe { Box::from_raw(provider.private_data as *mut RulePrivateData) }; + drop(private_data); Review Comment: I think we should also set private_data to null here too, similarly to https://github.com/apache/datafusion/blob/6edf4c523ac80af5a11bafba59466d7be068c3f8/datafusion/ffi/src/execution_plan.rs#L191 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
