milenkovicm commented on code in PR #1389: URL: https://github.com/apache/datafusion-ballista/pull/1389#discussion_r2702371230
########## ballista/client/tests/sort_shuffle.rs: ########## @@ -0,0 +1,450 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end integration tests for sort-based shuffle. +//! +//! These tests verify that the sort-based shuffle implementation produces +//! correct results for various query patterns that involve shuffling. + +mod common; + +#[cfg(test)] +#[cfg(feature = "standalone")] +mod sort_shuffle_tests { + use ballista::prelude::{SessionConfigExt, SessionContextExt}; + use ballista_core::config::{ + BALLISTA_SHUFFLE_SORT_BASED_BUFFER_SIZE, BALLISTA_SHUFFLE_SORT_BASED_ENABLED, + BALLISTA_SHUFFLE_SORT_BASED_MEMORY_LIMIT, + }; + use datafusion::arrow::util::pretty::pretty_format_batches; + use datafusion::common::Result; + use datafusion::execution::SessionStateBuilder; + use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; + use std::collections::HashSet; + + /// Creates a standalone session context with sort-based shuffle enabled. + async fn create_sort_shuffle_context() -> SessionContext { + let config = SessionConfig::new_with_ballista() + .set_str(BALLISTA_SHUFFLE_SORT_BASED_ENABLED, "true") + .set_str(BALLISTA_SHUFFLE_SORT_BASED_BUFFER_SIZE, "1048576") // 1MB + .set_str(BALLISTA_SHUFFLE_SORT_BASED_MEMORY_LIMIT, "268435456"); // 256MB Review Comment: also, one note, #1318 introduces block transfer rather than flight transfer to override and fail back to flight transport `.set_str(BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT, "true")` is needed. I believe we would need to patch flight server to support transport of sorted shuffles ########## ballista/core/src/execution_plans/sort_shuffle/reader.rs: ########## @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reader for sort-based shuffle output files. +//! +//! Reads partition data from the consolidated data file using the index +//! file to locate partition boundaries. Uses Arrow IPC FileReader for +//! efficient random access to specific batches. + +use crate::error::{BallistaError, Result}; +use datafusion::arrow::ipc::reader::FileReader; +use datafusion::arrow::record_batch::RecordBatch; +use std::fs::File; +use std::path::Path; + +use super::index::ShuffleIndex; + +/// Checks if a shuffle output uses the sort-based format by looking for +/// the index file. +pub fn is_sort_shuffle_output(data_path: &Path) -> bool { + let index_path = data_path.with_extension("arrow.index"); + index_path.exists() +} + +/// Gets the index file path for a data file. +pub fn get_index_path(data_path: &Path) -> std::path::PathBuf { + data_path.with_extension("arrow.index") +} + +/// Reads all batches for a specific partition from a sort shuffle data file. +/// +/// Uses Arrow IPC FileReader for efficient random access - directly reads +/// only the batches belonging to the requested partition without scanning +/// through preceding data. +/// +/// # Arguments +/// * `data_path` - Path to the consolidated data file (Arrow IPC File format) +/// * `index_path` - Path to the index file +/// * `partition_id` - The partition to read +/// +/// # Returns +/// Vector of record batches for the requested partition. +pub fn read_sort_shuffle_partition( + data_path: &Path, + index_path: &Path, + partition_id: usize, +) -> Result<Vec<RecordBatch>> { + // Load the index + let index = ShuffleIndex::read_from_file(index_path)?; + + if partition_id >= index.partition_count() { + return Err(BallistaError::General(format!( + "Partition {partition_id} not found in index (max: {})", + index.partition_count() + ))); + } + + // Check if partition has data + if !index.partition_has_data(partition_id) { + return Ok(Vec::new()); + } + + // Get the batch range for this partition from the index + // The index stores cumulative batch counts: + // - offset[i] = starting batch index for partition i + // - offset[i+1] (or total_length for last partition) = ending batch index (exclusive) + let (start_batch, end_batch) = index.get_partition_range(partition_id); + let start_batch = start_batch as usize; + let end_batch = end_batch as usize; + + // Open the data file with FileReader for random access + let file = File::open(data_path).map_err(BallistaError::IoError)?; + let mut reader = FileReader::try_new(file, None)?; + + let mut batches = Vec::with_capacity(end_batch - start_batch); + + // Use FileReader's set_index() for random access to specific batches + // This positions the reader directly at the starting batch index + reader.set_index(start_batch)?; + + // Read only the batches we need for this partition + for _ in start_batch..end_batch { + match reader.next() { + Some(Ok(batch)) => batches.push(batch), + Some(Err(e)) => return Err(e.into()), + None => break, + } + } + + Ok(batches) +} + +/// Reads all batches from a sort shuffle data file. +/// +/// # Arguments +/// * `data_path` - Path to the consolidated data file (Arrow IPC File format) +/// +/// # Returns +/// Vector of all record batches in the file. +pub fn read_all_batches(data_path: &Path) -> Result<Vec<RecordBatch>> { Review Comment: Would it make sense to return a stream instead of loading all vectors in a single vector ########## ballista/core/src/execution_plans/sort_shuffle/mod.rs: ########## @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort-based shuffle implementation for Ballista. +//! +//! This module provides an alternative to the hash-based shuffle that writes +//! a single consolidated file per input partition (sorted by output partition ID) +//! along with an index file mapping partition IDs to byte offsets. +//! Review Comment: maybe we could add paragraph from referred spark documentation, I personally find it as good algorithm outline: "Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks." ########## ballista/core/src/execution_plans/sort_shuffle/mod.rs: ########## @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort-based shuffle implementation for Ballista. +//! +//! This module provides an alternative to the hash-based shuffle that writes Review Comment: I would suggest change from: "This module provides an alternative to the hash-based shuffle that writes" to "This module provides an alternative to the hash-based shuffle. It writes" I personally find it easier to understand, but it's a personal option ########## ballista/core/src/execution_plans/shuffle_reader.rs: ########## @@ -525,7 +528,58 @@ async fn fetch_partition_local( let path = &location.path; let metadata = &location.executor_meta; let partition_id = &location.partition_id; + let data_path = std::path::Path::new(path); + + // Check if this is a sort-based shuffle output (has index file) + if is_sort_shuffle_output(data_path) { + debug!( + "Reading sort-based shuffle for partition {} from {:?}", + partition_id.partition_id, data_path + ); + let index_path = get_index_path(data_path); + let batches = read_sort_shuffle_partition( + data_path, + &index_path, + partition_id.partition_id, + ) + .map_err(|e| { + BallistaError::FetchFailed( + metadata.id.clone(), + partition_id.stage_id, + partition_id.partition_id, + e.to_string(), + ) + })?; + + // Create a stream from the batches + let schema = if let Some(first_batch) = batches.first() { + first_batch.schema() + } else { + // Empty partition - we need to get schema from the file + let file = File::open(data_path).map_err(|e| { + BallistaError::FetchFailed( + metadata.id.clone(), + partition_id.stage_id, + partition_id.partition_id, + e.to_string(), + ) + })?; + let reader = StreamReader::try_new(file, None).map_err(|e| { + BallistaError::FetchFailed( + metadata.id.clone(), + partition_id.stage_id, + partition_id.partition_id, + e.to_string(), + ) + })?; + reader.schema() + }; + + let stream = futures::stream::iter(batches.into_iter().map(Ok)); Review Comment: IMHO, getting stream from reader instead of all batches may make sense ########## ballista/client/tests/sort_shuffle.rs: ########## @@ -0,0 +1,450 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end integration tests for sort-based shuffle. +//! +//! These tests verify that the sort-based shuffle implementation produces +//! correct results for various query patterns that involve shuffling. + +mod common; + +#[cfg(test)] +#[cfg(feature = "standalone")] +mod sort_shuffle_tests { + use ballista::prelude::{SessionConfigExt, SessionContextExt}; + use ballista_core::config::{ + BALLISTA_SHUFFLE_SORT_BASED_BUFFER_SIZE, BALLISTA_SHUFFLE_SORT_BASED_ENABLED, + BALLISTA_SHUFFLE_SORT_BASED_MEMORY_LIMIT, + }; + use datafusion::arrow::util::pretty::pretty_format_batches; + use datafusion::common::Result; + use datafusion::execution::SessionStateBuilder; + use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; + use std::collections::HashSet; + + /// Creates a standalone session context with sort-based shuffle enabled. + async fn create_sort_shuffle_context() -> SessionContext { + let config = SessionConfig::new_with_ballista() + .set_str(BALLISTA_SHUFFLE_SORT_BASED_ENABLED, "true") + .set_str(BALLISTA_SHUFFLE_SORT_BASED_BUFFER_SIZE, "1048576") // 1MB + .set_str(BALLISTA_SHUFFLE_SORT_BASED_MEMORY_LIMIT, "268435456"); // 256MB Review Comment: adding `.set_str(BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ, "true")` will force remote read using flight. at the moment tests will fail if set. I would suggest using `rstest` to test local and remote read (contex with and without this option set) ########## ballista/core/src/execution_plans/sort_shuffle/spill.rs: ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Spill manager for sort-based shuffle. +//! +//! Handles writing partition buffers to disk when memory pressure is high, +//! and reading them back during the finalization phase. + +use crate::error::{BallistaError, Result}; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::ipc::reader::StreamReader; +use datafusion::arrow::ipc::writer::StreamWriter; +use datafusion::arrow::ipc::{CompressionType, writer::IpcWriteOptions}; +use datafusion::arrow::record_batch::RecordBatch; +use log::debug; +use std::collections::HashMap; +use std::fs::File; +use std::io::BufWriter; +use std::path::PathBuf; + +/// Manages spill files for sort-based shuffle. +/// +/// When partition buffers exceed memory limits, they are spilled to disk +/// as Arrow IPC files. During finalization, these spill files are read +/// back and merged into the consolidated output file. +#[derive(Debug)] +pub struct SpillManager { + /// Base directory for spill files + spill_dir: PathBuf, + /// Spill files per output partition: partition_id -> Vec<spill_file_path> + spill_files: HashMap<usize, Vec<PathBuf>>, + /// Counter for generating unique spill file names + spill_counter: usize, + /// Compression codec for spill files + compression: CompressionType, + /// Total number of spills performed + total_spills: usize, + /// Total bytes spilled + total_bytes_spilled: u64, +} + +impl SpillManager { + /// Creates a new spill manager. + /// + /// # Arguments + /// * `work_dir` - Base work directory + /// * `job_id` - Job identifier + /// * `stage_id` - Stage identifier + /// * `input_partition` - Input partition number + /// * `compression` - Compression codec for spill files + pub fn new( + work_dir: &str, + job_id: &str, + stage_id: usize, + input_partition: usize, + compression: CompressionType, + ) -> Result<Self> { + let mut spill_dir = PathBuf::from(work_dir); + spill_dir.push(job_id); + spill_dir.push(format!("{stage_id}")); + spill_dir.push(format!("{input_partition}")); + spill_dir.push("spill"); + + // Create spill directory + std::fs::create_dir_all(&spill_dir).map_err(BallistaError::IoError)?; + + Ok(Self { + spill_dir, + spill_files: HashMap::new(), + spill_counter: 0, + compression, + total_spills: 0, + total_bytes_spilled: 0, + }) + } + + /// Spills batches for a partition to disk. + /// + /// Returns the number of bytes written. + pub fn spill( + &mut self, + partition_id: usize, + batches: Vec<RecordBatch>, + schema: &SchemaRef, + ) -> Result<u64> { + if batches.is_empty() { + return Ok(0); + } + + let spill_path = self.next_spill_path(partition_id); + debug!( + "Spilling {} batches for partition {} to {:?}", + batches.len(), + partition_id, + spill_path + ); + + let file = File::create(&spill_path).map_err(BallistaError::IoError)?; + let buffered = BufWriter::new(file); + + let options = + IpcWriteOptions::default().try_with_compression(Some(self.compression))?; + + let mut writer = StreamWriter::try_new_with_options(buffered, schema, options)?; + + for batch in &batches { + writer.write(batch)?; + } + + writer.finish()?; + + let bytes_written = std::fs::metadata(&spill_path) + .map_err(BallistaError::IoError)? + .len(); + + // Track the spill file + self.spill_files + .entry(partition_id) + .or_default() + .push(spill_path); + + self.total_spills += 1; + self.total_bytes_spilled += bytes_written; + + Ok(bytes_written) + } + + /// Returns the spill files for a partition. + pub fn get_spill_files(&self, partition_id: usize) -> &[PathBuf] { + self.spill_files + .get(&partition_id) + .map(|v| v.as_slice()) + .unwrap_or(&[]) + } + + /// Returns true if the partition has spill files. + pub fn has_spill_files(&self, partition_id: usize) -> bool { + self.spill_files + .get(&partition_id) + .is_some_and(|v| !v.is_empty()) + } + + /// Reads all spill files for a partition and returns the batches. + pub fn read_spill_files(&self, partition_id: usize) -> Result<Vec<RecordBatch>> { Review Comment: would it make sense to avoid vector and use stream? ########## ballista/core/src/execution_plans/sort_shuffle/reader.rs: ########## @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reader for sort-based shuffle output files. +//! +//! Reads partition data from the consolidated data file using the index +//! file to locate partition boundaries. Uses Arrow IPC FileReader for +//! efficient random access to specific batches. + +use crate::error::{BallistaError, Result}; +use datafusion::arrow::ipc::reader::FileReader; +use datafusion::arrow::record_batch::RecordBatch; +use std::fs::File; +use std::path::Path; + +use super::index::ShuffleIndex; + +/// Checks if a shuffle output uses the sort-based format by looking for +/// the index file. +pub fn is_sort_shuffle_output(data_path: &Path) -> bool { + let index_path = data_path.with_extension("arrow.index"); + index_path.exists() +} + +/// Gets the index file path for a data file. +pub fn get_index_path(data_path: &Path) -> std::path::PathBuf { + data_path.with_extension("arrow.index") +} + +/// Reads all batches for a specific partition from a sort shuffle data file. +/// +/// Uses Arrow IPC FileReader for efficient random access - directly reads +/// only the batches belonging to the requested partition without scanning +/// through preceding data. +/// +/// # Arguments +/// * `data_path` - Path to the consolidated data file (Arrow IPC File format) +/// * `index_path` - Path to the index file +/// * `partition_id` - The partition to read +/// +/// # Returns +/// Vector of record batches for the requested partition. +pub fn read_sort_shuffle_partition( + data_path: &Path, + index_path: &Path, + partition_id: usize, +) -> Result<Vec<RecordBatch>> { Review Comment: Would it make sense to return a stream instead of loading all vectors in a single vector? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
