This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new acd98654bd refactor: Convert IPCWriter metrics from u64 to usize
(#10278)
acd98654bd is described below
commit acd98654bd86bfadafb76ab99b0e767ec4326bdb
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Mon Apr 29 12:13:37 2024 -0700
refactor: Convert IPCWriter metrics from u64 to usize (#10278)
---
datafusion/physical-plan/src/common.rs | 10 +++++-----
datafusion/physical-plan/src/sorts/sort.rs | 8 ++++----
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/datafusion/physical-plan/src/common.rs
b/datafusion/physical-plan/src/common.rs
index f7cad9df4b..cdd122cf36 100644
--- a/datafusion/physical-plan/src/common.rs
+++ b/datafusion/physical-plan/src/common.rs
@@ -259,11 +259,11 @@ pub struct IPCWriter {
/// inner writer
pub writer: FileWriter<File>,
/// batches written
- pub num_batches: u64,
+ pub num_batches: usize,
/// rows written
- pub num_rows: u64,
+ pub num_rows: usize,
/// bytes written
- pub num_bytes: u64,
+ pub num_bytes: usize,
}
impl IPCWriter {
@@ -306,9 +306,9 @@ impl IPCWriter {
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
- self.num_rows += batch.num_rows() as u64;
+ self.num_rows += batch.num_rows();
let num_bytes: usize = batch.get_array_memory_size();
- self.num_bytes += num_bytes as u64;
+ self.num_bytes += num_bytes;
Ok(())
}
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index e4e3d46dfb..ebeaf9e471 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -406,7 +406,7 @@ impl ExternalSorter {
let used = self.reservation.free();
self.metrics.spill_count.add(1);
self.metrics.spilled_bytes.add(used);
- self.metrics.spilled_rows.add(spilled_rows as usize);
+ self.metrics.spilled_rows.add(spilled_rows);
self.spills.push(spill_file);
Ok(used)
}
@@ -674,7 +674,7 @@ async fn spill_sorted_batches(
batches: Vec<RecordBatch>,
path: &Path,
schema: SchemaRef,
-) -> Result<u64> {
+) -> Result<usize> {
let path: PathBuf = path.into();
let task = SpawnedTask::spawn_blocking(move || write_sorted(batches, path,
schema));
match task.join().await {
@@ -705,7 +705,7 @@ fn write_sorted(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
-) -> Result<u64> {
+) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
@@ -715,7 +715,7 @@ fn write_sorted(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
- human_readable_size(writer.num_bytes as usize),
+ human_readable_size(writer.num_bytes),
);
Ok(writer.num_rows)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]