Copilot commented on code in PR #4779:
URL: https://github.com/apache/datafusion-comet/pull/4779#discussion_r3510199476


##########
native/shuffle/src/metrics.rs:
##########
@@ -28,7 +28,7 @@ pub(crate) struct ShufflePartitionerMetrics {
     pub(crate) repart_time: Time,
 
     /// Time spent in `interleave_record_batch` gathering shuffled batches
-    pub(crate) interleave_time: Time,
+    // TODO: pub(crate) interleave_time: Time,
 

Review Comment:
   `interleave_time` was removed (commented out) from 
`ShufflePartitionerMetrics`, which looks like a regression against the issue 
scope that calls out preserving shuffle write metrics. This also means 
`PartitionedBatchIterator` no longer records the cost of 
`interleave_record_batch`, and tools like 
`native/shuffle/src/bin/shuffle_bench.rs` that query `interleave_time` will 
stop reporting it.
   
   It would be better to keep the `interleave_time` metric and re-introduce 
timing around `interleave_record_batch` even with the new iterator-based API 
(for example by storing a `&Time` inside the iterator/producer, or by wrapping 
the iterator in a timing adapter).



##########
native/shuffle/src/partitioners/empty_schema.rs:
##########
@@ -85,45 +76,25 @@ impl ShufflePartitioner for EmptySchemaShufflePartitioner {
     fn shuffle_write(&mut self) -> datafusion::common::Result<()> {
         let start_time = Instant::now();
 
-        let output_data = OpenOptions::new()
-            .write(true)
-            .create(true)
-            .truncate(true)
-            .open(&self.output_data_file)
-            .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {e:?}")))?;
-        let mut output_data = BufWriter::new(output_data);
+        let mut write_timer = self.metrics.write_time.timer();

Review Comment:
   `EmptySchemaShufflePartitioner::shuffle_write` starts a `write_time` timer 
around the whole method, but the new `PartitionWriter` implementation 
(`LocalPartitionWriter`) already uses `metrics.write_time.timer()` internally 
(for spill copy and for final flush/index write). With overlapping timers, the 
same elapsed time can be counted more than once, and `write_time` also ends up 
including non-write work.
   
   Consider removing this outer `write_time` timer and letting the writer 
implementation own write-time accounting.



##########
native/shuffle/src/writers/local/local_partition_writer.rs:
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metrics::ShufflePartitionerMetrics;
+use crate::writers::local::spill::SpillWriter;
+use crate::writers::partition_writer::PartitionWriter;
+use crate::writers::BufBatchWriter;
+use crate::ShuffleBlockWriter;
+use arrow::array::RecordBatch;
+use datafusion::common::DataFusionError;
+use datafusion::execution::runtime_env::RuntimeEnv;
+use std::fs::{File, OpenOptions};
+use std::io::{BufWriter, Seek, Write};
+
+pub(crate) struct LocalPartitionWriter {
+    output_index_file: String,
+    spill_writers: Vec<SpillWriter>,
+    shuffle_block_writer: ShuffleBlockWriter,
+    output_writer: BufWriter<File>,
+    offsets: Vec<u64>,
+    batch_size: usize,
+    write_buffer_size: usize,
+    num_output_partitions: usize,
+    last_finish_pid: i32,
+}
+
+impl LocalPartitionWriter {
+    pub(crate) fn try_new(
+        output_data_file: String,
+        output_index_file: String,
+        shuffle_block_writer: ShuffleBlockWriter,
+        num_output_partitions: usize,
+        batch_size: usize,
+        write_buffer_size: usize,
+    ) -> datafusion::common::Result<Self> {
+        let spill_writers = if num_output_partitions == 1 {
+            vec![]
+        } else {
+            (0..num_output_partitions)
+                .map(|_| {
+                    SpillWriter::try_new(
+                        shuffle_block_writer.clone(),
+                        write_buffer_size,
+                        batch_size,
+                    )
+                })
+                .collect::<datafusion::common::Result<Vec<_>>>()?
+        };
+        let output_writer = OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .open(output_data_file.clone())
+            .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {e:?}")))?;
+
+        let output_writer = BufWriter::with_capacity(write_buffer_size, 
output_writer);
+        Ok(Self {
+            output_index_file,
+            spill_writers,
+            shuffle_block_writer,
+            output_writer,
+            offsets: vec![0u64; num_output_partitions + 1],
+            batch_size,
+            write_buffer_size,
+            num_output_partitions,
+            last_finish_pid: -1,
+        })
+    }
+
+    #[cfg(test)]
+    pub(crate) fn get_spill_writers(&self) -> &Vec<SpillWriter> {
+        &self.spill_writers
+    }
+}
+
+impl PartitionWriter for LocalPartitionWriter {
+    fn spill<I>(
+        &mut self,
+        pid: usize,
+        iter: &mut I,
+        runtime: &RuntimeEnv,
+        metrics: &ShufflePartitionerMetrics,
+    ) -> datafusion::common::Result<()>
+    where
+        I: Iterator<Item = datafusion::common::Result<RecordBatch>>,
+    {
+        self.spill_writers[pid]
+            .write(iter, runtime, metrics)
+            .map(|_| ())
+    }
+
+    fn write<I>(
+        &mut self,
+        pid: usize,
+        iter: &mut I,
+        metrics: &ShufflePartitionerMetrics,
+    ) -> datafusion::common::Result<()>
+    where
+        I: Iterator<Item = datafusion::common::Result<RecordBatch>>,
+    {
+        assert!(
+            pid == 0 && self.spill_writers.is_empty(),
+            "LocalPartitionWriter::write only for single shuffle partition."
+        );
+
+        let mut buf_batch_writer = BufBatchWriter::new(
+            &mut self.shuffle_block_writer,
+            &mut self.output_writer,
+            self.write_buffer_size,
+            self.batch_size,
+        );
+
+        for batch in iter.by_ref() {
+            let batch = batch?;
+            buf_batch_writer.write(&batch, &metrics.encode_time, 
&metrics.write_time)?;
+        }
+        buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
+
+        Ok(())
+    }
+
+    fn finish_partition<I>(
+        &mut self,
+        pid: usize,
+        iter: &mut I,
+        metrics: &ShufflePartitionerMetrics,
+    ) -> datafusion::common::Result<()>
+    where
+        I: Iterator<Item = datafusion::common::Result<RecordBatch>>,
+    {
+        assert_eq!(
+            pid as i32 - self.last_finish_pid,
+            1,
+            "LocalPartitionWriter::finish_partition must be called in order."
+        );
+        self.last_finish_pid = pid as i32;
+
+        self.offsets[pid] = self.output_writer.stream_position()?;
+
+        // if we wrote a spill file for this partition then copy the
+        // contents into the shuffle file
+        if let Some(writer) = self.spill_writers.get(pid) {
+            if let Some(spill_path) = writer.path() {
+                // Use raw File handle (not BufReader) so that std::io::copy
+                // can use copy_file_range/sendfile for zero-copy on Linux.
+                let mut spill_file = File::open(spill_path)?;
+                let mut write_timer = metrics.write_time.timer();
+                std::io::copy(&mut spill_file, &mut self.output_writer)?;
+                write_timer.stop();
+            }
+        }
+
+        // Write in memory batches to output data file
+        let mut buf_batch_writer = BufBatchWriter::new(
+            &mut self.shuffle_block_writer,
+            &mut self.output_writer,
+            self.write_buffer_size,
+            self.batch_size,
+        );
+        for batch in iter.by_ref() {
+            let batch = batch?;
+            buf_batch_writer.write(&batch, &metrics.encode_time, 
&metrics.write_time)?;
+        }
+        buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
+        Ok(())
+    }
+
+    fn finish_all(
+        &mut self,
+        metrics: &ShufflePartitionerMetrics,
+    ) -> datafusion::common::Result<()> {
+        let mut write_timer = metrics.write_time.timer();
+        self.output_writer.flush()?;
+        write_timer.stop();
+
+        // add one extra offset at last to ease partition length computation
+        self.offsets[self.num_output_partitions] = 
self.output_writer.stream_position()?;
+
+        let mut write_timer = metrics.write_time.timer();
+        let mut output_index = BufWriter::new(
+            File::create(self.output_index_file.clone())
+                .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {e:?}")))?,
+        );
+
+        self.offsets.iter().for_each(|offset| {
+            output_index.write_all(&(offset.to_le_bytes()[..])).unwrap();
+        });

Review Comment:
   `finish_all` can panic on I/O errors when writing the index file because it 
uses `unwrap()` inside `for_each`. This defeats the `Result`-based error 
handling and can crash the executor on transient disk errors.
   
   Also, the rest of the shuffle code reads index offsets as `i64` (for example 
in `shuffle_writer.rs` tests), so writing `i64` values here keeps the on-disk 
format consistent.



##########
native/shuffle/src/partitioners/single_partition.rs:
##########
@@ -123,20 +98,14 @@ impl ShufflePartitioner for 
SinglePartitionShufflePartitioner {
 
                 // Write the concatenated buffered batch
                 if let Some(batch) = concatenated_batch {
-                    self.output_data_writer.write(
-                        &batch,
-                        &self.metrics.encode_time,
-                        &self.metrics.write_time,
-                    )?;
+                    self.partition_writer
+                        .write(0, &mut iter::once(Ok(batch)), &self.metrics)?;
                 }
 
                 if num_rows >= self.batch_size {
                     // Write the new batch
-                    self.output_data_writer.write(
-                        &batch,
-                        &self.metrics.encode_time,
-                        &self.metrics.write_time,
-                    )?;
+                    self.partition_writer
+                        .write(0, &mut iter::once(Ok(batch)), &self.metrics)?;
                 } else {

Review Comment:
   `SinglePartitionShufflePartitioner` now calls `partition_writer.write(0, 
&mut iter::once(...))` for each batch as it streams. With the current 
`LocalPartitionWriter::write` implementation creating and flushing a 
`BufBatchWriter` per call, this can turn into many small flushes and resets of 
the coalescer state.
   
   If possible, consider batching the writes per `insert_batch` (for example by 
building a small iterator over the concatenated buffered batch and the incoming 
batch when both need to be written) so the writer sees fewer `write()` calls.



##########
native/shuffle/src/writers/partition_writer.rs:
##########
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metrics::ShufflePartitionerMetrics;
+use arrow::record_batch::RecordBatch;
+use datafusion::execution::runtime_env::RuntimeEnv;
+
+#[async_trait::async_trait]
+pub(crate) trait PartitionWriter: Send + Sync {

Review Comment:
   `PartitionWriter` is annotated with `#[async_trait::async_trait]` but all 
methods are synchronous. Keeping the macro here adds an unnecessary proc-macro 
expansion and can be confusing when looking for async behavior.
   
   Unless there is an immediate plan to make these methods `async`, it would be 
clearer to drop the attribute for now.



##########
native/shuffle/src/writers/local/local_partition_writer.rs:
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metrics::ShufflePartitionerMetrics;
+use crate::writers::local::spill::SpillWriter;
+use crate::writers::partition_writer::PartitionWriter;
+use crate::writers::BufBatchWriter;
+use crate::ShuffleBlockWriter;
+use arrow::array::RecordBatch;
+use datafusion::common::DataFusionError;
+use datafusion::execution::runtime_env::RuntimeEnv;
+use std::fs::{File, OpenOptions};
+use std::io::{BufWriter, Seek, Write};
+
+pub(crate) struct LocalPartitionWriter {
+    output_index_file: String,
+    spill_writers: Vec<SpillWriter>,
+    shuffle_block_writer: ShuffleBlockWriter,
+    output_writer: BufWriter<File>,
+    offsets: Vec<u64>,
+    batch_size: usize,
+    write_buffer_size: usize,
+    num_output_partitions: usize,
+    last_finish_pid: i32,
+}
+
+impl LocalPartitionWriter {
+    pub(crate) fn try_new(
+        output_data_file: String,
+        output_index_file: String,
+        shuffle_block_writer: ShuffleBlockWriter,
+        num_output_partitions: usize,
+        batch_size: usize,
+        write_buffer_size: usize,
+    ) -> datafusion::common::Result<Self> {
+        let spill_writers = if num_output_partitions == 1 {
+            vec![]
+        } else {
+            (0..num_output_partitions)
+                .map(|_| {
+                    SpillWriter::try_new(
+                        shuffle_block_writer.clone(),
+                        write_buffer_size,
+                        batch_size,
+                    )
+                })
+                .collect::<datafusion::common::Result<Vec<_>>>()?
+        };
+        let output_writer = OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .open(output_data_file.clone())
+            .map_err(|e| DataFusionError::Execution(format!("shuffle write 
error: {e:?}")))?;
+
+        let output_writer = BufWriter::with_capacity(write_buffer_size, 
output_writer);
+        Ok(Self {
+            output_index_file,
+            spill_writers,
+            shuffle_block_writer,
+            output_writer,
+            offsets: vec![0u64; num_output_partitions + 1],
+            batch_size,
+            write_buffer_size,
+            num_output_partitions,
+            last_finish_pid: -1,
+        })
+    }
+
+    #[cfg(test)]
+    pub(crate) fn get_spill_writers(&self) -> &Vec<SpillWriter> {
+        &self.spill_writers
+    }
+}
+
+impl PartitionWriter for LocalPartitionWriter {
+    fn spill<I>(
+        &mut self,
+        pid: usize,
+        iter: &mut I,
+        runtime: &RuntimeEnv,
+        metrics: &ShufflePartitionerMetrics,
+    ) -> datafusion::common::Result<()>
+    where
+        I: Iterator<Item = datafusion::common::Result<RecordBatch>>,
+    {
+        self.spill_writers[pid]
+            .write(iter, runtime, metrics)
+            .map(|_| ())
+    }
+
+    fn write<I>(
+        &mut self,
+        pid: usize,
+        iter: &mut I,
+        metrics: &ShufflePartitionerMetrics,
+    ) -> datafusion::common::Result<()>
+    where
+        I: Iterator<Item = datafusion::common::Result<RecordBatch>>,
+    {
+        assert!(
+            pid == 0 && self.spill_writers.is_empty(),
+            "LocalPartitionWriter::write only for single shuffle partition."
+        );
+
+        let mut buf_batch_writer = BufBatchWriter::new(
+            &mut self.shuffle_block_writer,
+            &mut self.output_writer,
+            self.write_buffer_size,
+            self.batch_size,
+        );
+
+        for batch in iter.by_ref() {
+            let batch = batch?;
+            buf_batch_writer.write(&batch, &metrics.encode_time, 
&metrics.write_time)?;
+        }
+        buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
+

Review Comment:
   In `LocalPartitionWriter::write` a new `BufBatchWriter` is constructed and 
then flushed on every call. For `BufBatchWriter`, `flush()` is not just an I/O 
flush. It also finalizes any partially coalesced batches in the internal 
`BatchCoalescer` and clears the internal byte buffer.
   
   With the new partitioner code calling `write()` multiple times (often with 
`iter::once(...)`), this loses cross-call coalescing and increases flush 
frequency compared to the prior design where a single `BufBatchWriter` lived 
for the whole shuffle write. It will likely regress performance for 
single-partition shuffles.
   
   It may be worth keeping a long-lived `BufBatchWriter` inside 
`LocalPartitionWriter` (at least for `pid == 0` single-partition mode) and only 
calling `flush()` from `finish_all()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to