Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-20 Thread via GitHub


2010YOUY01 commented on PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#issuecomment-2990228423

   > > 'here is the API changes that might break your system during upgrades', 
and this PR is like a new feature you might want to try in the new release -- 
do we have a separate place to document new features like this? 🤔
   > 
   > I was thinking the upgrade doc contains all things that we wanna highlight 
and let users be aware of
   
   I see. Maybe we can put new features and API changes into different sub 
sections.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-20 Thread via GitHub


xudong963 commented on PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#issuecomment-2990161090

   > 'here is the API changes that might break your system during upgrades', 
and this PR is like a new feature you might want to try in the new release -- 
do we have a separate place to document new features like this? 🤔
   
   I was thinking the upgrade doc contains all things that we wanna highlight 
and let users be aware of


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-20 Thread via GitHub


2010YOUY01 merged PR #16268:
URL: https://github.com/apache/datafusion/pull/16268


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-19 Thread via GitHub


2010YOUY01 commented on PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#issuecomment-2988025104

   > It's better to mention the option in DF49 upgrade doc
   
   Thank you for the reminder @xudong963 .
   
   I got a question: as I understand it, upgrade guide is like 'here is the API 
changes that might break your system during upgrades', and this PR is like a 
new feature you might want to try in the new release -- do we have a separate 
place to document new features like this? 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-18 Thread via GitHub


ding-young commented on code in PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#discussion_r2156147905


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -258,6 +259,8 @@ impl ExternalSorter {
 batch_size: usize,
 sort_spill_reservation_bytes: usize,
 sort_in_place_threshold_bytes: usize,
+// Configured via `datafusion.execution.spill_compression`.
+spill_compression: SpillCompression,

Review Comment:
   Here, too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-18 Thread via GitHub


ding-young commented on code in PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#discussion_r2156147684


##
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##
@@ -1324,6 +1326,8 @@ impl Stream for SortMergeJoinStream {
 impl SortMergeJoinStream {
 #[allow(clippy::too_many_arguments)]
 pub fn try_new(
+// Configured via `datafusion.execution.spill_compression`.
+spill_compression: SpillCompression,

Review Comment:
   I made the changes. Thank you.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-18 Thread via GitHub


2010YOUY01 commented on code in PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#discussion_r2154095345


##
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##
@@ -1324,6 +1326,7 @@ impl Stream for SortMergeJoinStream {
 impl SortMergeJoinStream {
 #[allow(clippy::too_many_arguments)]
 pub fn try_new(
+spill_compression: SpillCompression,

Review Comment:
   Nit: For readability, I suggest adding a comment indicating that this 
argument is passed through from the configuration xxx.
   In the future, we might consider enforcing a naming convention for such 
arguments—e.g., always using a `cfg_` prefix like `cfg_spill_compression`.



##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -258,6 +259,7 @@ impl ExternalSorter {
 batch_size: usize,
 sort_spill_reservation_bytes: usize,
 sort_in_place_threshold_bytes: usize,
+spill_compression: SpillCompression,

Review Comment:
   Same here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-18 Thread via GitHub


2010YOUY01 closed pull request #16268: Add compression option to SpillManager
URL: https://github.com/apache/datafusion/pull/16268


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-18 Thread via GitHub


2010YOUY01 commented on PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#issuecomment-2983191181

   close and reopen to trigger CI again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-18 Thread via GitHub


2010YOUY01 closed pull request #16268: Add compression option to SpillManager
URL: https://github.com/apache/datafusion/pull/16268


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-17 Thread via GitHub


ding-young commented on PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#issuecomment-2982326469

   Currently CI fails, but I think that is due to change introduced in another 
pr. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-17 Thread via GitHub


ding-young commented on code in PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#discussion_r2153450196


##
datafusion/common/src/config.rs:
##
@@ -274,6 +276,61 @@ config_namespace! {
 }
 }
 
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
+pub enum SpillCompression {
+Zstd,
+Lz4Frame,
+#[default]
+Uncompressed,
+}

Review Comment:
   I'll change the default codec after experiments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-17 Thread via GitHub


ding-young commented on code in PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#discussion_r2153447729


##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -44,16 +44,23 @@ pub struct SpillManager {
 schema: SchemaRef,
 /// Number of batches to buffer in memory during disk reads
 batch_read_buffer_capacity: usize,
-// TODO: Add general-purpose compression options
+/// general-purpose compression options
+pub(crate) compression: SpillCompression,
 }
 
 impl SpillManager {
-pub fn new(env: Arc, metrics: SpillMetrics, schema: SchemaRef) 
-> Self {
+pub fn new(
+env: Arc,
+metrics: SpillMetrics,
+schema: SchemaRef,
+compression: SpillCompression,

Review Comment:
   Thanks. This removed some diffs in existing unit tests since the signature 
of `new()` remains the same. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-17 Thread via GitHub


ding-young commented on code in PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#discussion_r2153442938


##
datafusion/core/tests/memory_limit/mod.rs:
##
@@ -630,6 +635,77 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> 
{
 Ok(())
 }
 
+/// External query should succeed using zstd as spill compression codec and
+/// and all temporary spill files are properly cleaned up after execution.
+/// Note: This test does not inspect file contents (e.g. magic number),
+/// as spill files are automatically deleted on drop.
+#[tokio::test]
+async fn test_spill_file_compressed_with_zstd() -> Result<()> {
+let disk_spill_limit = 1024 * 1024; // 1MB
+let spill_compression = SpillCompression::Zstd;
+let ctx = setup_context(disk_spill_limit, 128 * 1024, 
spill_compression).await?; // 1MB disk limit, 128KB memory limit, zstd
+
+let df = ctx
+.sql("select * from generate_series(1, 10) as t1(v1) order by v1")
+.await
+.unwrap();
+let plan = df.create_physical_plan().await.unwrap();
+
+let task_ctx = ctx.task_ctx();
+let _ = collect_batches(Arc::clone(&plan), task_ctx)
+.await
+.expect("Query execution failed");
+
+let spill_count = plan.metrics().unwrap().spill_count().unwrap();
+let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();
+
+println!("spill count {spill_count}");
+assert!(spill_count > 0);
+assert!((spilled_bytes as u64) < disk_spill_limit);
+
+// Verify that all temporary files have been properly cleaned up by 
checking
+// that the total disk usage tracked by the disk manager is zero
+let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
+assert_eq!(current_disk_usage, 0);
+
+Ok(())
+}
+
+/// External query should succeed using lz4_frame as spill compression codec 
and
+/// and all temporary spill files are properly cleaned up after execution.
+/// Note: This test does not inspect file contents (e.g. magic number),
+/// as spill files are automatically deleted on drop.
+#[tokio::test]
+async fn test_spill_file_compressed_with_lz4_frame() -> Result<()> {

Review Comment:
   I added test here, but unfortunately it is not straightforward to check 
whether the file is actually compressed with desired codec in e2e test. Maybe 
we can compare `spilled_bytes` after follow up fix.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-17 Thread via GitHub


ding-young commented on code in PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#discussion_r2151623165


##
datafusion/common/src/config.rs:
##
@@ -274,6 +276,60 @@ config_namespace! {
 }
 }
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SpillCompression {
+Zstd,
+Lz4Frame,
+Uncompressed,
+}
+
+impl FromStr for SpillCompression {
+type Err = DataFusionError;
+
+fn from_str(s: &str) -> Result {
+match s.to_ascii_lowercase().as_str() {
+"zstd" => Ok(Self::Zstd),
+"lz4_frame" => Ok(Self::Lz4Frame),
+"uncompressed" | "" => Ok(Self::Uncompressed),
+other => Err(DataFusionError::Execution(format!(
+"Invalid Spill file compression type: {other}. Expected one 
of: zstd, lz4, uncompressed"
+))),
+}
+}
+}
+
+impl ConfigField for SpillCompression {
+fn visit(&self, v: &mut V, key: &str, description: &'static str) 
{
+v.some(key, self, description)
+}
+
+fn set(&mut self, _: &str, value: &str) -> Result<()> {
+*self = SpillCompression::from_str(value)?;
+Ok(())
+}
+}
+
+impl Display for SpillCompression {
+fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+let str = match self {
+Self::Zstd => "zstd",
+Self::Lz4Frame => "lz4_frame",
+Self::Uncompressed => "",

Review Comment:
   Thanks for the catch. After I made that change, the "default" field in the 
docs shows "uncompressed".
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-16 Thread via GitHub


ding-young commented on code in PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#discussion_r2149653552


##
datafusion/common/src/config.rs:
##
@@ -330,6 +386,13 @@ config_namespace! {
 /// the new schema verification step.
 pub skip_physical_aggregate_schema_check: bool, default = false
 
+/// Sets the compression codec used when spilling data to disk.
+///
+/// Since datafusion writes spill files using the Arrow IPC Stream 
format,
+/// only codecs supported by the Arrow IPC Stream Writer are allowed.
+/// Valid values are: uncompressed, lz4_frame, zstd
+pub spill_compression: SpillCompression, default = 
SpillCompression::Uncompressed

Review Comment:
   Thank you! After doing some performance investigation, I'll set the optimal 
option as default. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-11 Thread via GitHub


2010YOUY01 commented on code in PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#discussion_r2141401180


##
datafusion/common/src/config.rs:
##
@@ -274,6 +276,60 @@ config_namespace! {
 }
 }
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SpillCompression {
+Zstd,
+Lz4Frame,
+Uncompressed,
+}
+
+impl FromStr for SpillCompression {
+type Err = DataFusionError;
+
+fn from_str(s: &str) -> Result {
+match s.to_ascii_lowercase().as_str() {
+"zstd" => Ok(Self::Zstd),
+"lz4_frame" => Ok(Self::Lz4Frame),
+"uncompressed" | "" => Ok(Self::Uncompressed),
+other => Err(DataFusionError::Execution(format!(
+"Invalid Spill file compression type: {other}. Expected one 
of: zstd, lz4, uncompressed"

Review Comment:
   ```suggestion
   "Invalid Spill file compression type: {other}. Expected one 
of: zstd, lz4_frame, uncompressed"
   ```



##
datafusion/common/src/config.rs:
##
@@ -274,6 +276,60 @@ config_namespace! {
 }
 }
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SpillCompression {
+Zstd,
+Lz4Frame,
+Uncompressed,
+}
+
+impl FromStr for SpillCompression {
+type Err = DataFusionError;
+
+fn from_str(s: &str) -> Result {
+match s.to_ascii_lowercase().as_str() {
+"zstd" => Ok(Self::Zstd),
+"lz4_frame" => Ok(Self::Lz4Frame),
+"uncompressed" | "" => Ok(Self::Uncompressed),
+other => Err(DataFusionError::Execution(format!(

Review Comment:
   `DataFusionError::Config` can be more suitable here.



##
datafusion/common/src/config.rs:
##
@@ -330,6 +386,13 @@ config_namespace! {
 /// the new schema verification step.
 pub skip_physical_aggregate_schema_check: bool, default = false
 
+/// Sets the compression codec used when spilling data to disk.
+///
+/// Since datafusion writes spill files using the Arrow IPC Stream 
format,
+/// only codecs supported by the Arrow IPC Stream Writer are allowed.
+/// Valid values are: uncompressed, lz4_frame, zstd
+pub spill_compression: SpillCompression, default = 
SpillCompression::Uncompressed

Review Comment:
   `lz4` is likely to provide the best speed/space tradeoff, we'll do some 
benchmark to confirm it.



##
datafusion/common/src/config.rs:
##
@@ -330,6 +386,13 @@ config_namespace! {
 /// the new schema verification step.
 pub skip_physical_aggregate_schema_check: bool, default = false
 
+/// Sets the compression codec used when spilling data to disk.
+///
+/// Since datafusion writes spill files using the Arrow IPC Stream 
format,
+/// only codecs supported by the Arrow IPC Stream Writer are allowed.
+/// Valid values are: uncompressed, lz4_frame, zstd

Review Comment:
   I suggest to mention it in the doc (as a quick reminder): lz4 is faster to 
compress, but has lower compression ratio, whereas zstd is slower to compress 
but achieves higher compression ratio.
   
   We can do some experiment in the future to make this info more specific, 
like what's the measured compression rate/speed on tpch table in arrow batches.



##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -44,16 +44,23 @@ pub struct SpillManager {
 schema: SchemaRef,
 /// Number of batches to buffer in memory during disk reads
 batch_read_buffer_capacity: usize,
-// TODO: Add general-purpose compression options
+/// general-purpose compression options
+pub(crate) compression: SpillCompression,
 }
 
 impl SpillManager {
-pub fn new(env: Arc, metrics: SpillMetrics, schema: SchemaRef) 
-> Self {
+pub fn new(
+env: Arc,
+metrics: SpillMetrics,
+schema: SchemaRef,
+compression: SpillCompression,

Review Comment:
   I suggest to use a builder pattern to set this new field. The reason is 
`SpillManager` is a public interface that can be used outside the crate, so 
it's better to keep the API stable.
   
   To do that, use a default compression option in `new()`, and add a 
`with_compression_type()` function to change the default compression type.



##
datafusion/common/src/config.rs:
##
@@ -274,6 +276,60 @@ config_namespace! {
 }
 }
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum SpillCompression {
+Zstd,
+Lz4Frame,
+Uncompressed,
+}
+
+impl FromStr for SpillCompression {
+type Err = DataFusionError;
+
+fn from_str(s: &str) -> Result {
+match s.to_ascii_lowercase().as_str() {
+"zstd" => Ok(Self::Zstd),
+"lz4_frame" => Ok(Self::Lz4Frame),
+"uncompressed" | "" => Ok(Self::Uncompressed),
+other => Err(DataFusionError::Execution(format!(
+"Invalid Spill file compression type: {other}. Expected one 
of: zstd

Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-11 Thread via GitHub


xudong963 commented on code in PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#discussion_r2139649288


##
datafusion/common/src/config.rs:
##
@@ -330,6 +386,13 @@ config_namespace! {
 /// the new schema verification step.
 pub skip_physical_aggregate_schema_check: bool, default = false
 
+/// Sets the compression codec used when spilling data to disk.
+///
+/// Since datafusion writes spill files using the Arrow IPC Stream 
format,
+/// only codecs supported by the Arrow IPC Stream Writer are allowed.
+/// Valid values are: uncompressed, lz4_frame, zstd
+pub spill_compression: SpillCompression, default = 
SpillCompression::Uncompressed

Review Comment:
   Should we choose a compression method that is optimal in most cases as the 
default?
   
   Fyi, spark's default is lz4.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Add compression option to SpillManager [datafusion]

2025-06-10 Thread via GitHub


ding-young commented on PR #16268:
URL: https://github.com/apache/datafusion/pull/16268#issuecomment-2961345444

   This is ready for review :) @2010YOUY01 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]