alamb commented on a change in pull request #1912: URL: https://github.com/apache/arrow-datafusion/pull/1912#discussion_r820643364
########## File path: ballista/rust/scheduler/src/scheduler_server/mod.rs ########## @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use tokio::sync::RwLock; +use tonic::transport::Channel; + +use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy}; +use ballista_core::error::Result; +use ballista_core::event_loop::EventLoop; +use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; + +use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec}; +use datafusion::prelude::{ExecutionConfig, ExecutionContext}; + +use crate::scheduler_server::event_loop::{ + SchedulerServerEvent, SchedulerServerEventAction, +}; +use crate::state::{ConfigBackendClient, SchedulerState}; + +// include the generated protobuf source as a submodule +#[allow(clippy::all)] +pub mod externalscaler { + include!(concat!(env!("OUT_DIR"), "/externalscaler.rs")); +} + +mod event_loop; +mod external_scaler; +mod grpc; +mod task_scheduler; + +type ExecutorsClient = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>; + +#[derive(Clone)] +pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> { + pub(crate) state: Arc<SchedulerState<T, U>>, + pub start_time: u128, + policy: TaskSchedulingPolicy, + executors_client: Option<ExecutorsClient>, + event_loop: Option<Arc<EventLoop<SchedulerServerEvent>>>, + ctx: Arc<RwLock<ExecutionContext>>, + codec: BallistaCodec<T, U>, +} + +impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T, U> { + pub fn new( + config: Arc<dyn ConfigBackendClient>, + namespace: String, + ctx: Arc<RwLock<ExecutionContext>>, + codec: BallistaCodec<T, U>, + ) -> Self { + SchedulerServer::new_with_policy( + config, + namespace, + TaskSchedulingPolicy::PullStaged, + ctx, + codec, + ) + } + + pub fn new_with_policy( + config: Arc<dyn ConfigBackendClient>, + namespace: String, + policy: TaskSchedulingPolicy, + ctx: Arc<RwLock<ExecutionContext>>, + codec: BallistaCodec<T, U>, + ) -> Self { + let state = Arc::new(SchedulerState::new(config, namespace, codec.clone())); + + let (executors_client, event_loop) = + if matches!(policy, TaskSchedulingPolicy::PushStaged) { + let executors_client = Arc::new(RwLock::new(HashMap::new())); + let event_action: Arc<SchedulerServerEventAction<T, U>> = + Arc::new(SchedulerServerEventAction::new( + state.clone(), + executors_client.clone(), + )); + let event_loop = + Arc::new(EventLoop::new("scheduler".to_owned(), 10000, event_action)); + (Some(executors_client), Some(event_loop)) + } else { + (None, None) + }; + Self { + state, + start_time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(), + policy, + executors_client, + event_loop, + ctx, + codec, + } + } + + pub async fn init(&mut self) -> Result<()> { + { + // initialize state + let ctx = self.ctx.read().await; + self.state.init(&ctx).await?; + } + + { + if let Some(event_loop) = self.event_loop.as_mut() { + // It's OK here, since we are sure the mutable reference only be used in the initialization + unsafe { Review comment: My personal recommendation is go with the `safe` approach (even if it seems slower at first as it may require locks). My rationale is: I have spent countless hours tracking down strange, subtle and hard to reproduce bugs related to memory corruption when working on C/C++ systems. My Rust experience has largely been free of such pain and I think the avoidance of that pain a key advantage of Rust and makes some of the painful parts of rust (like the borrow checker) worthwhile. I also think the overhead of taking a mutex is likely to be low, and you could make it lower still by using using a `RwLock` instead. Even better would be if you can refactor the code so that it is clear when the initialization has occured so you need neither mutex nor unsafe, perhaps as @thinkharderdev is suggesting I like to think in most matters I am pretty pragmatic, but the use of `unsafe` is something I do feel quite strongly about. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org