This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bb185a64d1 Avoid to use TempDir::into_path for temporary dirs expected
to be deleted automatically (#7252)
bb185a64d1 is described below
commit bb185a64d1ec14eb3f6c638147d382a2a0fcf4dd
Author: Kousuke Saruta <[email protected]>
AuthorDate: Thu Aug 10 23:38:49 2023 +0900
Avoid to use TempDir::into_path for temporary dirs expected to be deleted
automatically (#7252)
* Fix to avoid using TempDir::into_path.
* Modify tests.
---
.../core/src/datasource/physical_plan/csv.rs | 20 ++++++++++--
.../core/src/datasource/physical_plan/json.rs | 25 +++++++++++----
.../core/src/physical_plan/coalesce_partitions.rs | 4 ++-
datafusion/core/src/physical_plan/filter.rs | 7 ++--
datafusion/core/src/physical_plan/limit.rs | 13 +++++---
datafusion/core/src/physical_plan/projection.rs | 13 +++++---
datafusion/core/src/physical_plan/sorts/sort.rs | 10 ++++--
.../physical_plan/sorts/sort_preserving_merge.rs | 37 +++++++++++++++-------
datafusion/core/src/physical_plan/union.rs | 7 ++--
datafusion/core/src/physical_plan/windows/mod.rs | 17 +++++++---
datafusion/core/src/test/mod.rs | 10 +++---
11 files changed, 116 insertions(+), 47 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index cf61b205b4..6bf5e36340 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -683,6 +683,7 @@ mod tests {
let file_schema = aggr_test_schema();
let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";
+ let tmp_dir = TempDir::new()?;
let file_groups = partitioned_file_groups(
path.as_str(),
@@ -690,6 +691,7 @@ mod tests {
1,
FileType::CSV,
file_compression_type.to_owned(),
+ tmp_dir.path(),
)?;
let mut config = partitioned_csv_config(file_schema, file_groups)?;
@@ -747,6 +749,7 @@ mod tests {
let file_schema = aggr_test_schema();
let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";
+ let tmp_dir = TempDir::new()?;
let file_groups = partitioned_file_groups(
path.as_str(),
@@ -754,6 +757,7 @@ mod tests {
1,
FileType::CSV,
file_compression_type.to_owned(),
+ tmp_dir.path(),
)?;
let mut config = partitioned_csv_config(file_schema, file_groups)?;
@@ -811,6 +815,7 @@ mod tests {
let file_schema = aggr_test_schema();
let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";
+ let tmp_dir = TempDir::new()?;
let file_groups = partitioned_file_groups(
path.as_str(),
@@ -818,6 +823,7 @@ mod tests {
1,
FileType::CSV,
file_compression_type.to_owned(),
+ tmp_dir.path(),
)?;
let mut config = partitioned_csv_config(file_schema, file_groups)?;
@@ -875,6 +881,7 @@ mod tests {
let file_schema = aggr_test_schema_with_missing_col();
let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";
+ let tmp_dir = TempDir::new()?;
let file_groups = partitioned_file_groups(
path.as_str(),
@@ -882,6 +889,7 @@ mod tests {
1,
FileType::CSV,
file_compression_type.to_owned(),
+ tmp_dir.path(),
)?;
let mut config = partitioned_csv_config(file_schema, file_groups)?;
@@ -927,6 +935,7 @@ mod tests {
let file_schema = aggr_test_schema();
let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";
+ let tmp_dir = TempDir::new()?;
let file_groups = partitioned_file_groups(
path.as_str(),
@@ -934,6 +943,7 @@ mod tests {
1,
FileType::CSV,
file_compression_type.to_owned(),
+ tmp_dir.path(),
)?;
let mut config = partitioned_csv_config(file_schema, file_groups)?;
@@ -1021,7 +1031,7 @@ mod tests {
async fn test_additional_stores(
file_compression_type: FileCompressionType,
store: Arc<dyn ObjectStore>,
- ) {
+ ) -> Result<()> {
let ctx = SessionContext::new();
let url = Url::parse("file://").unwrap();
ctx.runtime_env().register_object_store(&url, store.clone());
@@ -1031,6 +1041,7 @@ mod tests {
let file_schema = aggr_test_schema();
let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";
+ let tmp_dir = TempDir::new()?;
let file_groups = partitioned_file_groups(
path.as_str(),
@@ -1038,6 +1049,7 @@ mod tests {
1,
FileType::CSV,
file_compression_type.to_owned(),
+ tmp_dir.path(),
)
.unwrap();
@@ -1057,6 +1069,7 @@ mod tests {
let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
assert_eq!(total_rows, 100);
+ Ok(())
}
#[rstest(
@@ -1072,7 +1085,7 @@ mod tests {
async fn test_chunked_csv(
file_compression_type: FileCompressionType,
#[values(10, 20, 30, 40)] chunk_size: usize,
- ) {
+ ) -> Result<()> {
test_additional_stores(
file_compression_type,
Arc::new(ChunkedStore::new(
@@ -1080,7 +1093,8 @@ mod tests {
chunk_size,
)),
)
- .await;
+ .await?;
+ Ok(())
}
#[tokio::test]
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 168ffdb364..b8ad2aa0a6 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -342,6 +342,7 @@ mod tests {
async fn prepare_store(
state: &SessionState,
file_compression_type: FileCompressionType,
+ work_dir: &Path,
) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
let store_url = ObjectStoreUrl::local_filesystem();
let store = state.runtime_env().object_store(&store_url).unwrap();
@@ -353,6 +354,7 @@ mod tests {
1,
FileType::JSON,
file_compression_type.to_owned(),
+ work_dir,
)
.unwrap();
let meta = file_groups
@@ -374,17 +376,19 @@ mod tests {
async fn test_additional_stores(
file_compression_type: FileCompressionType,
store: Arc<dyn ObjectStore>,
- ) {
+ ) -> Result<()> {
let ctx = SessionContext::new();
let url = Url::parse("file://").unwrap();
ctx.runtime_env().register_object_store(&url, store.clone());
let filename = "1.json";
+ let tmp_dir = TempDir::new()?;
let file_groups = partitioned_file_groups(
TEST_DATA_BASE,
filename,
1,
FileType::JSON,
file_compression_type.to_owned(),
+ tmp_dir.path(),
)
.unwrap();
let path = file_groups
@@ -424,6 +428,7 @@ mod tests {
],
&results
);
+ Ok(())
}
#[rstest(
@@ -444,8 +449,9 @@ mod tests {
let task_ctx = session_ctx.task_ctx();
use arrow::datatypes::DataType;
+ let tmp_dir = TempDir::new()?;
let (object_store_url, file_groups, file_schema) =
- prepare_store(&state, file_compression_type.to_owned()).await;
+ prepare_store(&state, file_compression_type.to_owned(),
tmp_dir.path()).await;
let exec = NdJsonExec::new(
FileScanConfig {
@@ -515,8 +521,10 @@ mod tests {
let state = session_ctx.state();
let task_ctx = session_ctx.task_ctx();
use arrow::datatypes::DataType;
+
+ let tmp_dir = TempDir::new()?;
let (object_store_url, file_groups, actual_schema) =
- prepare_store(&state, file_compression_type.to_owned()).await;
+ prepare_store(&state, file_compression_type.to_owned(),
tmp_dir.path()).await;
let mut builder = SchemaBuilder::from(actual_schema.fields());
builder.push(Field::new("missing_col", DataType::Int32, true));
@@ -568,8 +576,9 @@ mod tests {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = session_ctx.task_ctx();
+ let tmp_dir = TempDir::new()?;
let (object_store_url, file_groups, file_schema) =
- prepare_store(&state, file_compression_type.to_owned()).await;
+ prepare_store(&state, file_compression_type.to_owned(),
tmp_dir.path()).await;
let exec = NdJsonExec::new(
FileScanConfig {
@@ -620,8 +629,9 @@ mod tests {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = session_ctx.task_ctx();
+ let tmp_dir = TempDir::new()?;
let (object_store_url, file_groups, file_schema) =
- prepare_store(&state, file_compression_type.to_owned()).await;
+ prepare_store(&state, file_compression_type.to_owned(),
tmp_dir.path()).await;
let exec = NdJsonExec::new(
FileScanConfig {
@@ -729,7 +739,7 @@ mod tests {
async fn test_chunked_json(
file_compression_type: FileCompressionType,
#[values(10, 20, 30, 40)] chunk_size: usize,
- ) {
+ ) -> Result<()> {
test_additional_stores(
file_compression_type,
Arc::new(ChunkedStore::new(
@@ -737,7 +747,8 @@ mod tests {
chunk_size,
)),
)
- .await;
+ .await?;
+ Ok(())
}
#[tokio::test]
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs
b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index 14e8aada6c..cc2c9b5e4b 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -173,6 +173,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use futures::FutureExt;
+ use tempfile::TempDir;
use super::*;
use crate::physical_plan::{collect, common};
@@ -186,7 +187,8 @@ mod tests {
let task_ctx = Arc::new(TaskContext::default());
let num_partitions = 4;
- let csv = test::scan_partitioned_csv(num_partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(num_partitions, tmp_dir.path())?;
// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(),
num_partitions);
diff --git a/datafusion/core/src/physical_plan/filter.rs
b/datafusion/core/src/physical_plan/filter.rs
index f9fc4fb4b6..084b0b15d1 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -391,6 +391,7 @@ mod tests {
use datafusion_expr::Operator;
use std::iter::Iterator;
use std::sync::Arc;
+ use tempfile::TempDir;
#[tokio::test]
async fn simple_predicate() -> Result<()> {
@@ -398,7 +399,8 @@ mod tests {
let schema = test_util::aggr_test_schema();
let partitions = 4;
- let csv = test::scan_partitioned_csv(partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
let predicate: Arc<dyn PhysicalExpr> = binary(
binary(col("c2", &schema)?, Operator::Gt, lit(1u32), &schema)?,
@@ -425,7 +427,8 @@ mod tests {
async fn with_new_children() -> Result<()> {
let schema = test_util::aggr_test_schema();
let partitions = 4;
- let input = test::scan_partitioned_csv(partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let input = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
let predicate: Arc<dyn PhysicalExpr> =
binary(col("c2", &schema)?, Operator::Gt, lit(1u32), &schema)?;
diff --git a/datafusion/core/src/physical_plan/limit.rs
b/datafusion/core/src/physical_plan/limit.rs
index c5d5fbcfd1..1b86bfd9dc 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -528,6 +528,7 @@ impl RecordBatchStream for LimitStream {
mod tests {
use common::collect;
+ use tempfile::TempDir;
use super::*;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -539,7 +540,8 @@ mod tests {
let task_ctx = Arc::new(TaskContext::default());
let num_partitions = 4;
- let csv = test::scan_partitioned_csv(num_partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(num_partitions, tmp_dir.path())?;
// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(),
num_partitions);
@@ -655,7 +657,8 @@ mod tests {
let task_ctx = Arc::new(TaskContext::default());
let num_partitions = 4;
- let csv = test::scan_partitioned_csv(num_partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(num_partitions, tmp_dir.path())?;
assert_eq!(csv.output_partitioning().partition_count(),
num_partitions);
@@ -744,7 +747,8 @@ mod tests {
fetch: Option<usize>,
) -> Result<Option<usize>> {
let num_partitions = 4;
- let csv = test::scan_partitioned_csv(num_partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(num_partitions, tmp_dir.path())?;
assert_eq!(csv.output_partitioning().partition_count(),
num_partitions);
@@ -758,7 +762,8 @@ mod tests {
num_partitions: usize,
fetch: usize,
) -> Result<Option<usize>> {
- let csv = test::scan_partitioned_csv(num_partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(num_partitions, tmp_dir.path())?;
assert_eq!(csv.output_partitioning().partition_count(),
num_partitions);
diff --git a/datafusion/core/src/physical_plan/projection.rs
b/datafusion/core/src/physical_plan/projection.rs
index ca4a3e54f5..4391922967 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -433,6 +433,7 @@ mod tests {
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::binary;
use futures::future;
+ use tempfile::TempDir;
// Create a binary expression without coercion. Used here when we do not
want to coerce the expressions
// to valid types. Usage can result in an execution (after plan) error.
@@ -451,7 +452,8 @@ mod tests {
let schema = test_util::aggr_test_schema();
let partitions = 4;
- let csv = test::scan_partitioned_csv(partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
// pick column c1 and name it column c1 in the output schema
let projection =
@@ -488,7 +490,8 @@ mod tests {
let schema = test_util::aggr_test_schema();
let partitions = 4;
- let csv = test::scan_partitioned_csv(partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
// pick column c1 and name it column c1 in the output schema
let projection =
@@ -502,7 +505,8 @@ mod tests {
let schema = test_util::aggr_test_schema();
let partitions = 4;
- let csv = test::scan_partitioned_csv(partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
let c1 = col("c2", &schema).unwrap();
let c2 = col("c9", &schema).unwrap();
@@ -519,7 +523,8 @@ mod tests {
async fn project_no_column() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
- let csv = test::scan_partitioned_csv(1)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(1, tmp_dir.path())?;
let expected = collect(csv.execute(0,
task_ctx.clone())?).await.unwrap();
let projection = ProjectionExec::try_new(vec![], csv)?;
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 3bedbd17e8..52936dc55e 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -899,12 +899,14 @@ mod tests {
use datafusion_execution::runtime_env::RuntimeConfig;
use futures::FutureExt;
use std::collections::HashMap;
+ use tempfile::TempDir;
#[tokio::test]
async fn test_in_mem_sort() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let partitions = 4;
- let csv = test::scan_partitioned_csv(partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
let schema = csv.schema();
let sort_exec = Arc::new(SortExec::new(
@@ -973,7 +975,8 @@ mod tests {
);
let partitions = 4;
- let csv = test::scan_partitioned_csv(partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
let schema = csv.schema();
let sort_exec = Arc::new(SortExec::new(
@@ -1066,7 +1069,8 @@ mod tests {
.with_session_config(session_config),
);
- let csv = test::scan_partitioned_csv(partitions)?;
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(partitions, tmp_dir.path())?;
let schema = csv.schema();
let sort_exec = Arc::new(
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index bc0ac678f0..97d5813610 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -275,6 +275,7 @@ mod tests {
use arrow::record_batch::RecordBatch;
use datafusion_execution::config::SessionConfig;
use futures::{FutureExt, StreamExt};
+ use tempfile::TempDir;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::expressions::col;
@@ -556,10 +557,11 @@ mod tests {
}
#[tokio::test]
- async fn test_partition_sort() {
+ async fn test_partition_sort() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let partitions = 4;
- let csv = test::scan_partitioned_csv(partitions).unwrap();
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(partitions,
tmp_dir.path()).unwrap();
let schema = csv.schema();
let sort = vec![
@@ -598,6 +600,8 @@ mod tests {
basic, partition,
"basic:\n\n{basic}\n\npartition:\n\n{partition}\n\n"
);
+
+ Ok(())
}
// Split the provided record batch into multiple batch_size record batches
@@ -627,18 +631,21 @@ mod tests {
sort: Vec<PhysicalSortExpr>,
sizes: &[usize],
context: Arc<TaskContext>,
- ) -> Arc<dyn ExecutionPlan> {
+ ) -> Result<Arc<dyn ExecutionPlan>> {
let partitions = 4;
- let csv = test::scan_partitioned_csv(partitions).unwrap();
+ let tmp_dir = TempDir::new()?;
+ let csv = test::scan_partitioned_csv(partitions,
tmp_dir.path()).unwrap();
let sorted = basic_sort(csv, sort, context).await;
let split: Vec<_> = sizes.iter().map(|x| split_batch(&sorted,
*x)).collect();
- Arc::new(MemoryExec::try_new(&split, sorted.schema(), None).unwrap())
+ Ok(Arc::new(
+ MemoryExec::try_new(&split, sorted.schema(), None).unwrap(),
+ ))
}
#[tokio::test]
- async fn test_partition_sort_streaming_input() {
+ async fn test_partition_sort_streaming_input() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let sort = vec![
@@ -665,7 +672,8 @@ mod tests {
];
let input =
- sorted_partitioned_input(sort.clone(), &[10, 3, 11],
task_ctx.clone()).await;
+ sorted_partitioned_input(sort.clone(), &[10, 3, 11],
task_ctx.clone())
+ .await?;
let basic = basic_sort(input.clone(), sort.clone(),
task_ctx.clone()).await;
let partition = sorted_merge(input, sort, task_ctx.clone()).await;
@@ -680,10 +688,12 @@ mod tests {
.to_string();
assert_eq!(basic, partition);
+
+ Ok(())
}
#[tokio::test]
- async fn test_partition_sort_streaming_input_output() {
+ async fn test_partition_sort_streaming_input_output() -> Result<()> {
let schema = test_util::aggr_test_schema();
let sort = vec![
@@ -702,7 +712,8 @@ mod tests {
// Test streaming with default batch size
let task_ctx = Arc::new(TaskContext::default());
let input =
- sorted_partitioned_input(sort.clone(), &[10, 5, 13],
task_ctx.clone()).await;
+ sorted_partitioned_input(sort.clone(), &[10, 5, 13],
task_ctx.clone())
+ .await?;
let basic = basic_sort(input.clone(), sort.clone(), task_ctx).await;
// batch size of 23
@@ -726,6 +737,8 @@ mod tests {
.to_string();
assert_eq!(basic, partition);
+
+ Ok(())
}
#[tokio::test]
@@ -810,7 +823,7 @@ mod tests {
}
#[tokio::test]
- async fn test_async() {
+ async fn test_async() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let sort = vec![PhysicalSortExpr {
@@ -819,7 +832,7 @@ mod tests {
}];
let batches =
- sorted_partitioned_input(sort.clone(), &[5, 7, 3],
task_ctx.clone()).await;
+ sorted_partitioned_input(sort.clone(), &[5, 7, 3],
task_ctx.clone()).await?;
let partition_count = batches.output_partitioning().partition_count();
let mut streams = Vec::with_capacity(partition_count);
@@ -874,6 +887,8 @@ mod tests {
basic, partition,
"basic:\n\n{basic}\n\npartition:\n\n{partition}\n\n"
);
+
+ Ok(())
}
#[tokio::test]
diff --git a/datafusion/core/src/physical_plan/union.rs
b/datafusion/core/src/physical_plan/union.rs
index e29c96da09..034c867040 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -610,14 +610,17 @@ mod tests {
use crate::{physical_plan::collect, scalar::ScalarValue};
use arrow::record_batch::RecordBatch;
+ use tempfile::TempDir;
#[tokio::test]
async fn test_union_partitions() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
+ let tmp_dir = TempDir::new()?;
+
// Create csv's with different partitioning
- let csv = test::scan_partitioned_csv(4)?;
- let csv2 = test::scan_partitioned_csv(5)?;
+ let csv = test::scan_partitioned_csv(4, tmp_dir.path())?;
+ let csv2 = test::scan_partitioned_csv(5, tmp_dir.path())?;
let union_exec = Arc::new(UnionExec::new(vec![csv, csv2]));
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs
b/datafusion/core/src/physical_plan/windows/mod.rs
index cba7df772e..17c21834e1 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -372,9 +372,14 @@ mod tests {
use datafusion_execution::TaskContext;
use datafusion_expr::{create_udaf, Accumulator, Volatility};
use futures::FutureExt;
-
- fn create_test_schema(partitions: usize) -> Result<(Arc<CsvExec>,
SchemaRef)> {
- let csv = test::scan_partitioned_csv(partitions)?;
+ use std::path::Path;
+ use tempfile::TempDir;
+
+ fn create_test_schema(
+ partitions: usize,
+ work_dir: &Path,
+ ) -> Result<(Arc<CsvExec>, SchemaRef)> {
+ let csv = test::scan_partitioned_csv(partitions, work_dir)?;
let schema = csv.schema();
Ok((csv, schema))
}
@@ -547,7 +552,8 @@ mod tests {
);
let task_ctx = Arc::new(TaskContext::default());
- let (input, schema) = create_test_schema(1)?;
+ let tmp_dir = TempDir::new()?;
+ let (input, schema) = create_test_schema(1, tmp_dir.path())?;
let window_exec = Arc::new(WindowAggExec::try_new(
vec![create_window_expr(
@@ -579,7 +585,8 @@ mod tests {
#[tokio::test]
async fn window_function() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
- let (input, schema) = create_test_schema(1)?;
+ let tmp_dir = TempDir::new()?;
+ let (input, schema) = create_test_schema(1, tmp_dir.path())?;
let window_exec = Arc::new(WindowAggExec::try_new(
vec![
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 6e2bdfeeca..b7bf2a851a 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -47,9 +47,9 @@ use futures::{Future, FutureExt};
use std::fs::File;
use std::io::prelude::*;
use std::io::{BufReader, BufWriter};
+use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
-use tempfile::TempDir;
#[cfg(feature = "compression")]
use xz2::write::XzEncoder;
#[cfg(feature = "compression")]
@@ -73,7 +73,7 @@ pub fn create_table_dual() -> Arc<dyn TableProvider> {
}
/// Returns a [`CsvExec`] that scans "aggregate_test_100.csv" with
`partitions` partitions
-pub fn scan_partitioned_csv(partitions: usize) -> Result<Arc<CsvExec>> {
+pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) ->
Result<Arc<CsvExec>> {
let schema = aggr_test_schema();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv", arrow_test_data());
@@ -83,6 +83,7 @@ pub fn scan_partitioned_csv(partitions: usize) ->
Result<Arc<CsvExec>> {
partitions,
FileType::CSV,
FileCompressionType::UNCOMPRESSED,
+ work_dir,
)?;
let config = partitioned_csv_config(schema, file_groups)?;
Ok(Arc::new(CsvExec::new(
@@ -102,11 +103,10 @@ pub fn partitioned_file_groups(
partitions: usize,
file_type: FileType,
file_compression_type: FileCompressionType,
+ work_dir: &Path,
) -> Result<Vec<Vec<PartitionedFile>>> {
let path = format!("{path}/{filename}");
- let tmp_dir = TempDir::new()?.into_path();
-
let mut writers = vec![];
let mut files = vec![];
for i in 0..partitions {
@@ -118,7 +118,7 @@ pub fn partitioned_file_groups(
.get_ext_with_compression(file_compression_type.to_owned())
.unwrap()
);
- let filename = tmp_dir.join(filename);
+ let filename = work_dir.join(filename);
let file = File::create(&filename).unwrap();