Copilot commented on code in PR #4779:
URL: https://github.com/apache/datafusion-comet/pull/4779#discussion_r3510199476
##########
native/shuffle/src/metrics.rs:
##########
@@ -28,7 +28,7 @@ pub(crate) struct ShufflePartitionerMetrics {
pub(crate) repart_time: Time,
/// Time spent in `interleave_record_batch` gathering shuffled batches
- pub(crate) interleave_time: Time,
+ // TODO: pub(crate) interleave_time: Time,
Review Comment:
`interleave_time` was removed (commented out) from
`ShufflePartitionerMetrics`, which looks like a regression against the issue
scope that calls out preserving shuffle write metrics. This also means
`PartitionedBatchIterator` no longer records the cost of
`interleave_record_batch`, and tools like
`native/shuffle/src/bin/shuffle_bench.rs` that query `interleave_time` will
stop reporting it.
It would be better to keep the `interleave_time` metric and re-introduce
timing around `interleave_record_batch` even with the new iterator-based API
(for example by storing a `&Time` inside the iterator/producer, or by wrapping
the iterator in a timing adapter).
##########
native/shuffle/src/partitioners/empty_schema.rs:
##########
@@ -85,45 +76,25 @@ impl ShufflePartitioner for EmptySchemaShufflePartitioner {
fn shuffle_write(&mut self) -> datafusion::common::Result<()> {
let start_time = Instant::now();
- let output_data = OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .open(&self.output_data_file)
- .map_err(|e| DataFusionError::Execution(format!("shuffle write
error: {e:?}")))?;
- let mut output_data = BufWriter::new(output_data);
+ let mut write_timer = self.metrics.write_time.timer();
Review Comment:
`EmptySchemaShufflePartitioner::shuffle_write` starts a `write_time` timer
around the whole method, but the new `PartitionWriter` implementation
(`LocalPartitionWriter`) already uses `metrics.write_time.timer()` internally
(for spill copy and for final flush/index write). With overlapping timers, the
same elapsed time can be counted more than once, and `write_time` also ends up
including non-write work.
Consider removing this outer `write_time` timer and letting the writer
implementation own write-time accounting.
##########
native/shuffle/src/writers/local/local_partition_writer.rs:
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metrics::ShufflePartitionerMetrics;
+use crate::writers::local::spill::SpillWriter;
+use crate::writers::partition_writer::PartitionWriter;
+use crate::writers::BufBatchWriter;
+use crate::ShuffleBlockWriter;
+use arrow::array::RecordBatch;
+use datafusion::common::DataFusionError;
+use datafusion::execution::runtime_env::RuntimeEnv;
+use std::fs::{File, OpenOptions};
+use std::io::{BufWriter, Seek, Write};
+
+pub(crate) struct LocalPartitionWriter {
+ output_index_file: String,
+ spill_writers: Vec<SpillWriter>,
+ shuffle_block_writer: ShuffleBlockWriter,
+ output_writer: BufWriter<File>,
+ offsets: Vec<u64>,
+ batch_size: usize,
+ write_buffer_size: usize,
+ num_output_partitions: usize,
+ last_finish_pid: i32,
+}
+
+impl LocalPartitionWriter {
+ pub(crate) fn try_new(
+ output_data_file: String,
+ output_index_file: String,
+ shuffle_block_writer: ShuffleBlockWriter,
+ num_output_partitions: usize,
+ batch_size: usize,
+ write_buffer_size: usize,
+ ) -> datafusion::common::Result<Self> {
+ let spill_writers = if num_output_partitions == 1 {
+ vec![]
+ } else {
+ (0..num_output_partitions)
+ .map(|_| {
+ SpillWriter::try_new(
+ shuffle_block_writer.clone(),
+ write_buffer_size,
+ batch_size,
+ )
+ })
+ .collect::<datafusion::common::Result<Vec<_>>>()?
+ };
+ let output_writer = OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .open(output_data_file.clone())
+ .map_err(|e| DataFusionError::Execution(format!("shuffle write
error: {e:?}")))?;
+
+ let output_writer = BufWriter::with_capacity(write_buffer_size,
output_writer);
+ Ok(Self {
+ output_index_file,
+ spill_writers,
+ shuffle_block_writer,
+ output_writer,
+ offsets: vec![0u64; num_output_partitions + 1],
+ batch_size,
+ write_buffer_size,
+ num_output_partitions,
+ last_finish_pid: -1,
+ })
+ }
+
+ #[cfg(test)]
+ pub(crate) fn get_spill_writers(&self) -> &Vec<SpillWriter> {
+ &self.spill_writers
+ }
+}
+
+impl PartitionWriter for LocalPartitionWriter {
+ fn spill<I>(
+ &mut self,
+ pid: usize,
+ iter: &mut I,
+ runtime: &RuntimeEnv,
+ metrics: &ShufflePartitionerMetrics,
+ ) -> datafusion::common::Result<()>
+ where
+ I: Iterator<Item = datafusion::common::Result<RecordBatch>>,
+ {
+ self.spill_writers[pid]
+ .write(iter, runtime, metrics)
+ .map(|_| ())
+ }
+
+ fn write<I>(
+ &mut self,
+ pid: usize,
+ iter: &mut I,
+ metrics: &ShufflePartitionerMetrics,
+ ) -> datafusion::common::Result<()>
+ where
+ I: Iterator<Item = datafusion::common::Result<RecordBatch>>,
+ {
+ assert!(
+ pid == 0 && self.spill_writers.is_empty(),
+ "LocalPartitionWriter::write only for single shuffle partition."
+ );
+
+ let mut buf_batch_writer = BufBatchWriter::new(
+ &mut self.shuffle_block_writer,
+ &mut self.output_writer,
+ self.write_buffer_size,
+ self.batch_size,
+ );
+
+ for batch in iter.by_ref() {
+ let batch = batch?;
+ buf_batch_writer.write(&batch, &metrics.encode_time,
&metrics.write_time)?;
+ }
+ buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
+
+ Ok(())
+ }
+
+ fn finish_partition<I>(
+ &mut self,
+ pid: usize,
+ iter: &mut I,
+ metrics: &ShufflePartitionerMetrics,
+ ) -> datafusion::common::Result<()>
+ where
+ I: Iterator<Item = datafusion::common::Result<RecordBatch>>,
+ {
+ assert_eq!(
+ pid as i32 - self.last_finish_pid,
+ 1,
+ "LocalPartitionWriter::finish_partition must be called in order."
+ );
+ self.last_finish_pid = pid as i32;
+
+ self.offsets[pid] = self.output_writer.stream_position()?;
+
+ // if we wrote a spill file for this partition then copy the
+ // contents into the shuffle file
+ if let Some(writer) = self.spill_writers.get(pid) {
+ if let Some(spill_path) = writer.path() {
+ // Use raw File handle (not BufReader) so that std::io::copy
+ // can use copy_file_range/sendfile for zero-copy on Linux.
+ let mut spill_file = File::open(spill_path)?;
+ let mut write_timer = metrics.write_time.timer();
+ std::io::copy(&mut spill_file, &mut self.output_writer)?;
+ write_timer.stop();
+ }
+ }
+
+ // Write in memory batches to output data file
+ let mut buf_batch_writer = BufBatchWriter::new(
+ &mut self.shuffle_block_writer,
+ &mut self.output_writer,
+ self.write_buffer_size,
+ self.batch_size,
+ );
+ for batch in iter.by_ref() {
+ let batch = batch?;
+ buf_batch_writer.write(&batch, &metrics.encode_time,
&metrics.write_time)?;
+ }
+ buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
+ Ok(())
+ }
+
+ fn finish_all(
+ &mut self,
+ metrics: &ShufflePartitionerMetrics,
+ ) -> datafusion::common::Result<()> {
+ let mut write_timer = metrics.write_time.timer();
+ self.output_writer.flush()?;
+ write_timer.stop();
+
+ // add one extra offset at last to ease partition length computation
+ self.offsets[self.num_output_partitions] =
self.output_writer.stream_position()?;
+
+ let mut write_timer = metrics.write_time.timer();
+ let mut output_index = BufWriter::new(
+ File::create(self.output_index_file.clone())
+ .map_err(|e| DataFusionError::Execution(format!("shuffle write
error: {e:?}")))?,
+ );
+
+ self.offsets.iter().for_each(|offset| {
+ output_index.write_all(&(offset.to_le_bytes()[..])).unwrap();
+ });
Review Comment:
`finish_all` can panic on I/O errors when writing the index file because it
uses `unwrap()` inside `for_each`. This defeats the `Result`-based error
handling and can crash the executor on transient disk errors.
Also, the rest of the shuffle code reads index offsets as `i64` (for example
in `shuffle_writer.rs` tests), so writing `i64` values here keeps the on-disk
format consistent.
##########
native/shuffle/src/partitioners/single_partition.rs:
##########
@@ -123,20 +98,14 @@ impl ShufflePartitioner for
SinglePartitionShufflePartitioner {
// Write the concatenated buffered batch
if let Some(batch) = concatenated_batch {
- self.output_data_writer.write(
- &batch,
- &self.metrics.encode_time,
- &self.metrics.write_time,
- )?;
+ self.partition_writer
+ .write(0, &mut iter::once(Ok(batch)), &self.metrics)?;
}
if num_rows >= self.batch_size {
// Write the new batch
- self.output_data_writer.write(
- &batch,
- &self.metrics.encode_time,
- &self.metrics.write_time,
- )?;
+ self.partition_writer
+ .write(0, &mut iter::once(Ok(batch)), &self.metrics)?;
} else {
Review Comment:
`SinglePartitionShufflePartitioner` now calls `partition_writer.write(0,
&mut iter::once(...))` for each batch as it streams. With the current
`LocalPartitionWriter::write` implementation creating and flushing a
`BufBatchWriter` per call, this can turn into many small flushes and resets of
the coalescer state.
If possible, consider batching the writes per `insert_batch` (for example by
building a small iterator over the concatenated buffered batch and the incoming
batch when both need to be written) so the writer sees fewer `write()` calls.
##########
native/shuffle/src/writers/partition_writer.rs:
##########
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metrics::ShufflePartitionerMetrics;
+use arrow::record_batch::RecordBatch;
+use datafusion::execution::runtime_env::RuntimeEnv;
+
+#[async_trait::async_trait]
+pub(crate) trait PartitionWriter: Send + Sync {
Review Comment:
`PartitionWriter` is annotated with `#[async_trait::async_trait]` but all
methods are synchronous. Keeping the macro here adds an unnecessary proc-macro
expansion and can be confusing when looking for async behavior.
Unless there is an immediate plan to make these methods `async`, it would be
clearer to drop the attribute for now.
##########
native/shuffle/src/writers/local/local_partition_writer.rs:
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metrics::ShufflePartitionerMetrics;
+use crate::writers::local::spill::SpillWriter;
+use crate::writers::partition_writer::PartitionWriter;
+use crate::writers::BufBatchWriter;
+use crate::ShuffleBlockWriter;
+use arrow::array::RecordBatch;
+use datafusion::common::DataFusionError;
+use datafusion::execution::runtime_env::RuntimeEnv;
+use std::fs::{File, OpenOptions};
+use std::io::{BufWriter, Seek, Write};
+
+pub(crate) struct LocalPartitionWriter {
+ output_index_file: String,
+ spill_writers: Vec<SpillWriter>,
+ shuffle_block_writer: ShuffleBlockWriter,
+ output_writer: BufWriter<File>,
+ offsets: Vec<u64>,
+ batch_size: usize,
+ write_buffer_size: usize,
+ num_output_partitions: usize,
+ last_finish_pid: i32,
+}
+
+impl LocalPartitionWriter {
+ pub(crate) fn try_new(
+ output_data_file: String,
+ output_index_file: String,
+ shuffle_block_writer: ShuffleBlockWriter,
+ num_output_partitions: usize,
+ batch_size: usize,
+ write_buffer_size: usize,
+ ) -> datafusion::common::Result<Self> {
+ let spill_writers = if num_output_partitions == 1 {
+ vec![]
+ } else {
+ (0..num_output_partitions)
+ .map(|_| {
+ SpillWriter::try_new(
+ shuffle_block_writer.clone(),
+ write_buffer_size,
+ batch_size,
+ )
+ })
+ .collect::<datafusion::common::Result<Vec<_>>>()?
+ };
+ let output_writer = OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .open(output_data_file.clone())
+ .map_err(|e| DataFusionError::Execution(format!("shuffle write
error: {e:?}")))?;
+
+ let output_writer = BufWriter::with_capacity(write_buffer_size,
output_writer);
+ Ok(Self {
+ output_index_file,
+ spill_writers,
+ shuffle_block_writer,
+ output_writer,
+ offsets: vec![0u64; num_output_partitions + 1],
+ batch_size,
+ write_buffer_size,
+ num_output_partitions,
+ last_finish_pid: -1,
+ })
+ }
+
+ #[cfg(test)]
+ pub(crate) fn get_spill_writers(&self) -> &Vec<SpillWriter> {
+ &self.spill_writers
+ }
+}
+
+impl PartitionWriter for LocalPartitionWriter {
+ fn spill<I>(
+ &mut self,
+ pid: usize,
+ iter: &mut I,
+ runtime: &RuntimeEnv,
+ metrics: &ShufflePartitionerMetrics,
+ ) -> datafusion::common::Result<()>
+ where
+ I: Iterator<Item = datafusion::common::Result<RecordBatch>>,
+ {
+ self.spill_writers[pid]
+ .write(iter, runtime, metrics)
+ .map(|_| ())
+ }
+
+ fn write<I>(
+ &mut self,
+ pid: usize,
+ iter: &mut I,
+ metrics: &ShufflePartitionerMetrics,
+ ) -> datafusion::common::Result<()>
+ where
+ I: Iterator<Item = datafusion::common::Result<RecordBatch>>,
+ {
+ assert!(
+ pid == 0 && self.spill_writers.is_empty(),
+ "LocalPartitionWriter::write only for single shuffle partition."
+ );
+
+ let mut buf_batch_writer = BufBatchWriter::new(
+ &mut self.shuffle_block_writer,
+ &mut self.output_writer,
+ self.write_buffer_size,
+ self.batch_size,
+ );
+
+ for batch in iter.by_ref() {
+ let batch = batch?;
+ buf_batch_writer.write(&batch, &metrics.encode_time,
&metrics.write_time)?;
+ }
+ buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
+
Review Comment:
In `LocalPartitionWriter::write` a new `BufBatchWriter` is constructed and
then flushed on every call. For `BufBatchWriter`, `flush()` is not just an I/O
flush. It also finalizes any partially coalesced batches in the internal
`BatchCoalescer` and clears the internal byte buffer.
With the new partitioner code calling `write()` multiple times (often with
`iter::once(...)`), this loses cross-call coalescing and increases flush
frequency compared to the prior design where a single `BufBatchWriter` lived
for the whole shuffle write. It will likely regress performance for
single-partition shuffles.
It may be worth keeping a long-lived `BufBatchWriter` inside
`LocalPartitionWriter` (at least for `pid == 0` single-partition mode) and only
calling `flush()` from `finish_all()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]