This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bb185a64d1 Avoid to use TempDir::into_path for temporary dirs expected 
to be deleted automatically (#7252)
bb185a64d1 is described below

commit bb185a64d1ec14eb3f6c638147d382a2a0fcf4dd
Author: Kousuke Saruta <[email protected]>
AuthorDate: Thu Aug 10 23:38:49 2023 +0900

    Avoid to use TempDir::into_path for temporary dirs expected to be deleted 
automatically (#7252)
    
    * Fix to avoid using TempDir::into_path.
    
    * Modify tests.
---
 .../core/src/datasource/physical_plan/csv.rs       | 20 ++++++++++--
 .../core/src/datasource/physical_plan/json.rs      | 25 +++++++++++----
 .../core/src/physical_plan/coalesce_partitions.rs  |  4 ++-
 datafusion/core/src/physical_plan/filter.rs        |  7 ++--
 datafusion/core/src/physical_plan/limit.rs         | 13 +++++---
 datafusion/core/src/physical_plan/projection.rs    | 13 +++++---
 datafusion/core/src/physical_plan/sorts/sort.rs    | 10 ++++--
 .../physical_plan/sorts/sort_preserving_merge.rs   | 37 +++++++++++++++-------
 datafusion/core/src/physical_plan/union.rs         |  7 ++--
 datafusion/core/src/physical_plan/windows/mod.rs   | 17 +++++++---
 datafusion/core/src/test/mod.rs                    | 10 +++---
 11 files changed, 116 insertions(+), 47 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index cf61b205b4..6bf5e36340 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -683,6 +683,7 @@ mod tests {
         let file_schema = aggr_test_schema();
         let path = format!("{}/csv", arrow_test_data());
         let filename = "aggregate_test_100.csv";
+        let tmp_dir = TempDir::new()?;
 
         let file_groups = partitioned_file_groups(
             path.as_str(),
@@ -690,6 +691,7 @@ mod tests {
             1,
             FileType::CSV,
             file_compression_type.to_owned(),
+            tmp_dir.path(),
         )?;
 
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
@@ -747,6 +749,7 @@ mod tests {
         let file_schema = aggr_test_schema();
         let path = format!("{}/csv", arrow_test_data());
         let filename = "aggregate_test_100.csv";
+        let tmp_dir = TempDir::new()?;
 
         let file_groups = partitioned_file_groups(
             path.as_str(),
@@ -754,6 +757,7 @@ mod tests {
             1,
             FileType::CSV,
             file_compression_type.to_owned(),
+            tmp_dir.path(),
         )?;
 
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
@@ -811,6 +815,7 @@ mod tests {
         let file_schema = aggr_test_schema();
         let path = format!("{}/csv", arrow_test_data());
         let filename = "aggregate_test_100.csv";
+        let tmp_dir = TempDir::new()?;
 
         let file_groups = partitioned_file_groups(
             path.as_str(),
@@ -818,6 +823,7 @@ mod tests {
             1,
             FileType::CSV,
             file_compression_type.to_owned(),
+            tmp_dir.path(),
         )?;
 
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
@@ -875,6 +881,7 @@ mod tests {
         let file_schema = aggr_test_schema_with_missing_col();
         let path = format!("{}/csv", arrow_test_data());
         let filename = "aggregate_test_100.csv";
+        let tmp_dir = TempDir::new()?;
 
         let file_groups = partitioned_file_groups(
             path.as_str(),
@@ -882,6 +889,7 @@ mod tests {
             1,
             FileType::CSV,
             file_compression_type.to_owned(),
+            tmp_dir.path(),
         )?;
 
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
@@ -927,6 +935,7 @@ mod tests {
         let file_schema = aggr_test_schema();
         let path = format!("{}/csv", arrow_test_data());
         let filename = "aggregate_test_100.csv";
+        let tmp_dir = TempDir::new()?;
 
         let file_groups = partitioned_file_groups(
             path.as_str(),
@@ -934,6 +943,7 @@ mod tests {
             1,
             FileType::CSV,
             file_compression_type.to_owned(),
+            tmp_dir.path(),
         )?;
 
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
@@ -1021,7 +1031,7 @@ mod tests {
     async fn test_additional_stores(
         file_compression_type: FileCompressionType,
         store: Arc<dyn ObjectStore>,
-    ) {
+    ) -> Result<()> {
         let ctx = SessionContext::new();
         let url = Url::parse("file://").unwrap();
         ctx.runtime_env().register_object_store(&url, store.clone());
@@ -1031,6 +1041,7 @@ mod tests {
         let file_schema = aggr_test_schema();
         let path = format!("{}/csv", arrow_test_data());
         let filename = "aggregate_test_100.csv";
+        let tmp_dir = TempDir::new()?;
 
         let file_groups = partitioned_file_groups(
             path.as_str(),
@@ -1038,6 +1049,7 @@ mod tests {
             1,
             FileType::CSV,
             file_compression_type.to_owned(),
+            tmp_dir.path(),
         )
         .unwrap();
 
@@ -1057,6 +1069,7 @@ mod tests {
         let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
 
         assert_eq!(total_rows, 100);
+        Ok(())
     }
 
     #[rstest(
@@ -1072,7 +1085,7 @@ mod tests {
     async fn test_chunked_csv(
         file_compression_type: FileCompressionType,
         #[values(10, 20, 30, 40)] chunk_size: usize,
-    ) {
+    ) -> Result<()> {
         test_additional_stores(
             file_compression_type,
             Arc::new(ChunkedStore::new(
@@ -1080,7 +1093,8 @@ mod tests {
                 chunk_size,
             )),
         )
-        .await;
+        .await?;
+        Ok(())
     }
 
     #[tokio::test]
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 168ffdb364..b8ad2aa0a6 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -342,6 +342,7 @@ mod tests {
     async fn prepare_store(
         state: &SessionState,
         file_compression_type: FileCompressionType,
+        work_dir: &Path,
     ) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
         let store_url = ObjectStoreUrl::local_filesystem();
         let store = state.runtime_env().object_store(&store_url).unwrap();
@@ -353,6 +354,7 @@ mod tests {
             1,
             FileType::JSON,
             file_compression_type.to_owned(),
+            work_dir,
         )
         .unwrap();
         let meta = file_groups
@@ -374,17 +376,19 @@ mod tests {
     async fn test_additional_stores(
         file_compression_type: FileCompressionType,
         store: Arc<dyn ObjectStore>,
-    ) {
+    ) -> Result<()> {
         let ctx = SessionContext::new();
         let url = Url::parse("file://").unwrap();
         ctx.runtime_env().register_object_store(&url, store.clone());
         let filename = "1.json";
+        let tmp_dir = TempDir::new()?;
         let file_groups = partitioned_file_groups(
             TEST_DATA_BASE,
             filename,
             1,
             FileType::JSON,
             file_compression_type.to_owned(),
+            tmp_dir.path(),
         )
         .unwrap();
         let path = file_groups
@@ -424,6 +428,7 @@ mod tests {
             ],
             &results
         );
+        Ok(())
     }
 
     #[rstest(
@@ -444,8 +449,9 @@ mod tests {
         let task_ctx = session_ctx.task_ctx();
         use arrow::datatypes::DataType;
 
+        let tmp_dir = TempDir::new()?;
         let (object_store_url, file_groups, file_schema) =
-            prepare_store(&state, file_compression_type.to_owned()).await;
+            prepare_store(&state, file_compression_type.to_owned(), 
tmp_dir.path()).await;
 
         let exec = NdJsonExec::new(
             FileScanConfig {
@@ -515,8 +521,10 @@ mod tests {
         let state = session_ctx.state();
         let task_ctx = session_ctx.task_ctx();
         use arrow::datatypes::DataType;
+
+        let tmp_dir = TempDir::new()?;
         let (object_store_url, file_groups, actual_schema) =
-            prepare_store(&state, file_compression_type.to_owned()).await;
+            prepare_store(&state, file_compression_type.to_owned(), 
tmp_dir.path()).await;
 
         let mut builder = SchemaBuilder::from(actual_schema.fields());
         builder.push(Field::new("missing_col", DataType::Int32, true));
@@ -568,8 +576,9 @@ mod tests {
         let session_ctx = SessionContext::new();
         let state = session_ctx.state();
         let task_ctx = session_ctx.task_ctx();
+        let tmp_dir = TempDir::new()?;
         let (object_store_url, file_groups, file_schema) =
-            prepare_store(&state, file_compression_type.to_owned()).await;
+            prepare_store(&state, file_compression_type.to_owned(), 
tmp_dir.path()).await;
 
         let exec = NdJsonExec::new(
             FileScanConfig {
@@ -620,8 +629,9 @@ mod tests {
         let session_ctx = SessionContext::new();
         let state = session_ctx.state();
         let task_ctx = session_ctx.task_ctx();
+        let tmp_dir = TempDir::new()?;
         let (object_store_url, file_groups, file_schema) =
-            prepare_store(&state, file_compression_type.to_owned()).await;
+            prepare_store(&state, file_compression_type.to_owned(), 
tmp_dir.path()).await;
 
         let exec = NdJsonExec::new(
             FileScanConfig {
@@ -729,7 +739,7 @@ mod tests {
     async fn test_chunked_json(
         file_compression_type: FileCompressionType,
         #[values(10, 20, 30, 40)] chunk_size: usize,
-    ) {
+    ) -> Result<()> {
         test_additional_stores(
             file_compression_type,
             Arc::new(ChunkedStore::new(
@@ -737,7 +747,8 @@ mod tests {
                 chunk_size,
             )),
         )
-        .await;
+        .await?;
+        Ok(())
     }
 
     #[tokio::test]
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs 
b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index 14e8aada6c..cc2c9b5e4b 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -173,6 +173,7 @@ mod tests {
 
     use arrow::datatypes::{DataType, Field, Schema};
     use futures::FutureExt;
+    use tempfile::TempDir;
 
     use super::*;
     use crate::physical_plan::{collect, common};
@@ -186,7 +187,8 @@ mod tests {
         let task_ctx = Arc::new(TaskContext::default());
 
         let num_partitions = 4;
-        let csv = test::scan_partitioned_csv(num_partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(num_partitions, tmp_dir.path())?;
 
         // input should have 4 partitions
         assert_eq!(csv.output_partitioning().partition_count(), 
num_partitions);
diff --git a/datafusion/core/src/physical_plan/filter.rs 
b/datafusion/core/src/physical_plan/filter.rs
index f9fc4fb4b6..084b0b15d1 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -391,6 +391,7 @@ mod tests {
     use datafusion_expr::Operator;
     use std::iter::Iterator;
     use std::sync::Arc;
+    use tempfile::TempDir;
 
     #[tokio::test]
     async fn simple_predicate() -> Result<()> {
@@ -398,7 +399,8 @@ mod tests {
         let schema = test_util::aggr_test_schema();
 
         let partitions = 4;
-        let csv = test::scan_partitioned_csv(partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
 
         let predicate: Arc<dyn PhysicalExpr> = binary(
             binary(col("c2", &schema)?, Operator::Gt, lit(1u32), &schema)?,
@@ -425,7 +427,8 @@ mod tests {
     async fn with_new_children() -> Result<()> {
         let schema = test_util::aggr_test_schema();
         let partitions = 4;
-        let input = test::scan_partitioned_csv(partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let input = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
 
         let predicate: Arc<dyn PhysicalExpr> =
             binary(col("c2", &schema)?, Operator::Gt, lit(1u32), &schema)?;
diff --git a/datafusion/core/src/physical_plan/limit.rs 
b/datafusion/core/src/physical_plan/limit.rs
index c5d5fbcfd1..1b86bfd9dc 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -528,6 +528,7 @@ impl RecordBatchStream for LimitStream {
 mod tests {
 
     use common::collect;
+    use tempfile::TempDir;
 
     use super::*;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -539,7 +540,8 @@ mod tests {
         let task_ctx = Arc::new(TaskContext::default());
 
         let num_partitions = 4;
-        let csv = test::scan_partitioned_csv(num_partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(num_partitions, tmp_dir.path())?;
 
         // input should have 4 partitions
         assert_eq!(csv.output_partitioning().partition_count(), 
num_partitions);
@@ -655,7 +657,8 @@ mod tests {
         let task_ctx = Arc::new(TaskContext::default());
 
         let num_partitions = 4;
-        let csv = test::scan_partitioned_csv(num_partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(num_partitions, tmp_dir.path())?;
 
         assert_eq!(csv.output_partitioning().partition_count(), 
num_partitions);
 
@@ -744,7 +747,8 @@ mod tests {
         fetch: Option<usize>,
     ) -> Result<Option<usize>> {
         let num_partitions = 4;
-        let csv = test::scan_partitioned_csv(num_partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(num_partitions, tmp_dir.path())?;
 
         assert_eq!(csv.output_partitioning().partition_count(), 
num_partitions);
 
@@ -758,7 +762,8 @@ mod tests {
         num_partitions: usize,
         fetch: usize,
     ) -> Result<Option<usize>> {
-        let csv = test::scan_partitioned_csv(num_partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(num_partitions, tmp_dir.path())?;
 
         assert_eq!(csv.output_partitioning().partition_count(), 
num_partitions);
 
diff --git a/datafusion/core/src/physical_plan/projection.rs 
b/datafusion/core/src/physical_plan/projection.rs
index ca4a3e54f5..4391922967 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -433,6 +433,7 @@ mod tests {
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::binary;
     use futures::future;
+    use tempfile::TempDir;
 
     // Create a binary expression without coercion. Used here when we do not 
want to coerce the expressions
     // to valid types. Usage can result in an execution (after plan) error.
@@ -451,7 +452,8 @@ mod tests {
         let schema = test_util::aggr_test_schema();
 
         let partitions = 4;
-        let csv = test::scan_partitioned_csv(partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
 
         // pick column c1 and name it column c1 in the output schema
         let projection =
@@ -488,7 +490,8 @@ mod tests {
         let schema = test_util::aggr_test_schema();
 
         let partitions = 4;
-        let csv = test::scan_partitioned_csv(partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
 
         // pick column c1 and name it column c1 in the output schema
         let projection =
@@ -502,7 +505,8 @@ mod tests {
         let schema = test_util::aggr_test_schema();
 
         let partitions = 4;
-        let csv = test::scan_partitioned_csv(partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
 
         let c1 = col("c2", &schema).unwrap();
         let c2 = col("c9", &schema).unwrap();
@@ -519,7 +523,8 @@ mod tests {
     async fn project_no_column() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
 
-        let csv = test::scan_partitioned_csv(1)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(1, tmp_dir.path())?;
         let expected = collect(csv.execute(0, 
task_ctx.clone())?).await.unwrap();
 
         let projection = ProjectionExec::try_new(vec![], csv)?;
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 3bedbd17e8..52936dc55e 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -899,12 +899,14 @@ mod tests {
     use datafusion_execution::runtime_env::RuntimeConfig;
     use futures::FutureExt;
     use std::collections::HashMap;
+    use tempfile::TempDir;
 
     #[tokio::test]
     async fn test_in_mem_sort() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
         let partitions = 4;
-        let csv = test::scan_partitioned_csv(partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
         let schema = csv.schema();
 
         let sort_exec = Arc::new(SortExec::new(
@@ -973,7 +975,8 @@ mod tests {
         );
 
         let partitions = 4;
-        let csv = test::scan_partitioned_csv(partitions)?;
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
         let schema = csv.schema();
 
         let sort_exec = Arc::new(SortExec::new(
@@ -1066,7 +1069,8 @@ mod tests {
                     .with_session_config(session_config),
             );
 
-            let csv = test::scan_partitioned_csv(partitions)?;
+            let tmp_dir = TempDir::new()?;
+            let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
             let schema = csv.schema();
 
             let sort_exec = Arc::new(
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs 
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index bc0ac678f0..97d5813610 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -275,6 +275,7 @@ mod tests {
     use arrow::record_batch::RecordBatch;
     use datafusion_execution::config::SessionConfig;
     use futures::{FutureExt, StreamExt};
+    use tempfile::TempDir;
 
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::expressions::col;
@@ -556,10 +557,11 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_partition_sort() {
+    async fn test_partition_sort() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
         let partitions = 4;
-        let csv = test::scan_partitioned_csv(partitions).unwrap();
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(partitions, 
tmp_dir.path()).unwrap();
         let schema = csv.schema();
 
         let sort = vec![
@@ -598,6 +600,8 @@ mod tests {
             basic, partition,
             "basic:\n\n{basic}\n\npartition:\n\n{partition}\n\n"
         );
+
+        Ok(())
     }
 
     // Split the provided record batch into multiple batch_size record batches
@@ -627,18 +631,21 @@ mod tests {
         sort: Vec<PhysicalSortExpr>,
         sizes: &[usize],
         context: Arc<TaskContext>,
-    ) -> Arc<dyn ExecutionPlan> {
+    ) -> Result<Arc<dyn ExecutionPlan>> {
         let partitions = 4;
-        let csv = test::scan_partitioned_csv(partitions).unwrap();
+        let tmp_dir = TempDir::new()?;
+        let csv = test::scan_partitioned_csv(partitions, 
tmp_dir.path()).unwrap();
 
         let sorted = basic_sort(csv, sort, context).await;
         let split: Vec<_> = sizes.iter().map(|x| split_batch(&sorted, 
*x)).collect();
 
-        Arc::new(MemoryExec::try_new(&split, sorted.schema(), None).unwrap())
+        Ok(Arc::new(
+            MemoryExec::try_new(&split, sorted.schema(), None).unwrap(),
+        ))
     }
 
     #[tokio::test]
-    async fn test_partition_sort_streaming_input() {
+    async fn test_partition_sort_streaming_input() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
         let schema = test_util::aggr_test_schema();
         let sort = vec![
@@ -665,7 +672,8 @@ mod tests {
         ];
 
         let input =
-            sorted_partitioned_input(sort.clone(), &[10, 3, 11], 
task_ctx.clone()).await;
+            sorted_partitioned_input(sort.clone(), &[10, 3, 11], 
task_ctx.clone())
+                .await?;
         let basic = basic_sort(input.clone(), sort.clone(), 
task_ctx.clone()).await;
         let partition = sorted_merge(input, sort, task_ctx.clone()).await;
 
@@ -680,10 +688,12 @@ mod tests {
             .to_string();
 
         assert_eq!(basic, partition);
+
+        Ok(())
     }
 
     #[tokio::test]
-    async fn test_partition_sort_streaming_input_output() {
+    async fn test_partition_sort_streaming_input_output() -> Result<()> {
         let schema = test_util::aggr_test_schema();
 
         let sort = vec![
@@ -702,7 +712,8 @@ mod tests {
         // Test streaming with default batch size
         let task_ctx = Arc::new(TaskContext::default());
         let input =
-            sorted_partitioned_input(sort.clone(), &[10, 5, 13], 
task_ctx.clone()).await;
+            sorted_partitioned_input(sort.clone(), &[10, 5, 13], 
task_ctx.clone())
+                .await?;
         let basic = basic_sort(input.clone(), sort.clone(), task_ctx).await;
 
         // batch size of 23
@@ -726,6 +737,8 @@ mod tests {
             .to_string();
 
         assert_eq!(basic, partition);
+
+        Ok(())
     }
 
     #[tokio::test]
@@ -810,7 +823,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_async() {
+    async fn test_async() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
         let schema = test_util::aggr_test_schema();
         let sort = vec![PhysicalSortExpr {
@@ -819,7 +832,7 @@ mod tests {
         }];
 
         let batches =
-            sorted_partitioned_input(sort.clone(), &[5, 7, 3], 
task_ctx.clone()).await;
+            sorted_partitioned_input(sort.clone(), &[5, 7, 3], 
task_ctx.clone()).await?;
 
         let partition_count = batches.output_partitioning().partition_count();
         let mut streams = Vec::with_capacity(partition_count);
@@ -874,6 +887,8 @@ mod tests {
             basic, partition,
             "basic:\n\n{basic}\n\npartition:\n\n{partition}\n\n"
         );
+
+        Ok(())
     }
 
     #[tokio::test]
diff --git a/datafusion/core/src/physical_plan/union.rs 
b/datafusion/core/src/physical_plan/union.rs
index e29c96da09..034c867040 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -610,14 +610,17 @@ mod tests {
 
     use crate::{physical_plan::collect, scalar::ScalarValue};
     use arrow::record_batch::RecordBatch;
+    use tempfile::TempDir;
 
     #[tokio::test]
     async fn test_union_partitions() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
 
+        let tmp_dir = TempDir::new()?;
+
         // Create csv's with different partitioning
-        let csv = test::scan_partitioned_csv(4)?;
-        let csv2 = test::scan_partitioned_csv(5)?;
+        let csv = test::scan_partitioned_csv(4, tmp_dir.path())?;
+        let csv2 = test::scan_partitioned_csv(5, tmp_dir.path())?;
 
         let union_exec = Arc::new(UnionExec::new(vec![csv, csv2]));
 
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs 
b/datafusion/core/src/physical_plan/windows/mod.rs
index cba7df772e..17c21834e1 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -372,9 +372,14 @@ mod tests {
     use datafusion_execution::TaskContext;
     use datafusion_expr::{create_udaf, Accumulator, Volatility};
     use futures::FutureExt;
-
-    fn create_test_schema(partitions: usize) -> Result<(Arc<CsvExec>, 
SchemaRef)> {
-        let csv = test::scan_partitioned_csv(partitions)?;
+    use std::path::Path;
+    use tempfile::TempDir;
+
+    fn create_test_schema(
+        partitions: usize,
+        work_dir: &Path,
+    ) -> Result<(Arc<CsvExec>, SchemaRef)> {
+        let csv = test::scan_partitioned_csv(partitions, work_dir)?;
         let schema = csv.schema();
         Ok((csv, schema))
     }
@@ -547,7 +552,8 @@ mod tests {
         );
 
         let task_ctx = Arc::new(TaskContext::default());
-        let (input, schema) = create_test_schema(1)?;
+        let tmp_dir = TempDir::new()?;
+        let (input, schema) = create_test_schema(1, tmp_dir.path())?;
 
         let window_exec = Arc::new(WindowAggExec::try_new(
             vec![create_window_expr(
@@ -579,7 +585,8 @@ mod tests {
     #[tokio::test]
     async fn window_function() -> Result<()> {
         let task_ctx = Arc::new(TaskContext::default());
-        let (input, schema) = create_test_schema(1)?;
+        let tmp_dir = TempDir::new()?;
+        let (input, schema) = create_test_schema(1, tmp_dir.path())?;
 
         let window_exec = Arc::new(WindowAggExec::try_new(
             vec![
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 6e2bdfeeca..b7bf2a851a 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -47,9 +47,9 @@ use futures::{Future, FutureExt};
 use std::fs::File;
 use std::io::prelude::*;
 use std::io::{BufReader, BufWriter};
+use std::path::Path;
 use std::pin::Pin;
 use std::sync::Arc;
-use tempfile::TempDir;
 #[cfg(feature = "compression")]
 use xz2::write::XzEncoder;
 #[cfg(feature = "compression")]
@@ -73,7 +73,7 @@ pub fn create_table_dual() -> Arc<dyn TableProvider> {
 }
 
 /// Returns a [`CsvExec`] that scans "aggregate_test_100.csv" with 
`partitions` partitions
-pub fn scan_partitioned_csv(partitions: usize) -> Result<Arc<CsvExec>> {
+pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> 
Result<Arc<CsvExec>> {
     let schema = aggr_test_schema();
     let filename = "aggregate_test_100.csv";
     let path = format!("{}/csv", arrow_test_data());
@@ -83,6 +83,7 @@ pub fn scan_partitioned_csv(partitions: usize) -> 
Result<Arc<CsvExec>> {
         partitions,
         FileType::CSV,
         FileCompressionType::UNCOMPRESSED,
+        work_dir,
     )?;
     let config = partitioned_csv_config(schema, file_groups)?;
     Ok(Arc::new(CsvExec::new(
@@ -102,11 +103,10 @@ pub fn partitioned_file_groups(
     partitions: usize,
     file_type: FileType,
     file_compression_type: FileCompressionType,
+    work_dir: &Path,
 ) -> Result<Vec<Vec<PartitionedFile>>> {
     let path = format!("{path}/{filename}");
 
-    let tmp_dir = TempDir::new()?.into_path();
-
     let mut writers = vec![];
     let mut files = vec![];
     for i in 0..partitions {
@@ -118,7 +118,7 @@ pub fn partitioned_file_groups(
                 .get_ext_with_compression(file_compression_type.to_owned())
                 .unwrap()
         );
-        let filename = tmp_dir.join(filename);
+        let filename = work_dir.join(filename);
 
         let file = File::create(&filename).unwrap();
 

Reply via email to