This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3ac92adb41 use FileFormat::get_ext as the default file extension
filter (#12417)
3ac92adb41 is described below
commit 3ac92adb41e08e6ce1c5fdfb92e215bf3151255f
Author: waruto <[email protected]>
AuthorDate: Sun Sep 15 20:20:48 2024 +0800
use FileFormat::get_ext as the default file extension filter (#12417)
* use defeault file extention filter from FileFormat
* use with_file_extension_opt api
---
datafusion/core/src/datasource/listing/table.rs | 62 ++++++++++++++++++++++---
1 file changed, 55 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 225995ca4f..adf907011b 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -228,13 +228,13 @@ pub struct ListingOptions {
impl ListingOptions {
/// Creates an options instance with the given format
/// Default values:
- /// - no file extension filter
+ /// - use default file extension filter
/// - no input partition to discover
/// - one target partition
/// - stat collection
pub fn new(format: Arc<dyn FileFormat>) -> Self {
Self {
- file_extension: String::new(),
+ file_extension: format.get_ext(),
format,
table_partition_cols: vec![],
collect_stat: true,
@@ -1314,6 +1314,7 @@ mod tests {
"test:///bucket/key-prefix/",
12,
5,
+ Some(""),
)
.await?;
@@ -1328,6 +1329,7 @@ mod tests {
"test:///bucket/key-prefix/",
4,
4,
+ Some(""),
)
.await?;
@@ -1343,12 +1345,19 @@ mod tests {
"test:///bucket/key-prefix/",
2,
2,
+ Some(""),
)
.await?;
// no files => no groups
- assert_list_files_for_scan_grouping(&[], "test:///bucket/key-prefix/",
2, 0)
- .await?;
+ assert_list_files_for_scan_grouping(
+ &[],
+ "test:///bucket/key-prefix/",
+ 2,
+ 0,
+ Some(""),
+ )
+ .await?;
// files that don't match the prefix
assert_list_files_for_scan_grouping(
@@ -1360,6 +1369,21 @@ mod tests {
"test:///bucket/key-prefix/",
10,
2,
+ Some(""),
+ )
+ .await?;
+
+ // files that don't match the prefix or the default file extention
+ assert_list_files_for_scan_grouping(
+ &[
+ "bucket/key-prefix/file0.avro",
+ "bucket/key-prefix/file1.parquet",
+ "bucket/other-prefix/roguefile.avro",
+ ],
+ "test:///bucket/key-prefix/",
+ 10,
+ 1,
+ None,
)
.await?;
Ok(())
@@ -1380,6 +1404,7 @@ mod tests {
&["test:///bucket/key1/", "test:///bucket/key2/"],
12,
5,
+ Some(""),
)
.await?;
@@ -1396,6 +1421,7 @@ mod tests {
&["test:///bucket/key1/", "test:///bucket/key2/"],
5,
5,
+ Some(""),
)
.await?;
@@ -1412,11 +1438,13 @@ mod tests {
&["test:///bucket/key1/"],
2,
2,
+ Some(""),
)
.await?;
// no files => no groups
- assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2,
0).await?;
+ assert_list_files_for_multi_paths(&[], &["test:///bucket/key1/"], 2,
0, Some(""))
+ .await?;
// files that don't match the prefix
assert_list_files_for_multi_paths(
@@ -1431,6 +1459,24 @@ mod tests {
&["test:///bucket/key3/"],
2,
1,
+ Some(""),
+ )
+ .await?;
+
+ // files that don't match the prefix or the default file ext
+ assert_list_files_for_multi_paths(
+ &[
+ "bucket/key1/file0.avro",
+ "bucket/key1/file1.csv",
+ "bucket/key1/file2.avro",
+ "bucket/key2/file3.csv",
+ "bucket/key2/file4.avro",
+ "bucket/key3/file5.csv",
+ ],
+ &["test:///bucket/key1/", "test:///bucket/key3/"],
+ 2,
+ 2,
+ None,
)
.await?;
Ok(())
@@ -1458,6 +1504,7 @@ mod tests {
table_prefix: &str,
target_partitions: usize,
output_partitioning: usize,
+ file_ext: Option<&str>,
) -> Result<()> {
let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f,
10)).collect::<Vec<_>>());
@@ -1465,7 +1512,7 @@ mod tests {
let format = AvroFormat {};
let opt = ListingOptions::new(Arc::new(format))
- .with_file_extension("")
+ .with_file_extension_opt(file_ext)
.with_target_partitions(target_partitions);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean,
false)]);
@@ -1491,6 +1538,7 @@ mod tests {
table_prefix: &[&str],
target_partitions: usize,
output_partitioning: usize,
+ file_ext: Option<&str>,
) -> Result<()> {
let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f,
10)).collect::<Vec<_>>());
@@ -1498,7 +1546,7 @@ mod tests {
let format = AvroFormat {};
let opt = ListingOptions::new(Arc::new(format))
- .with_file_extension("")
+ .with_file_extension_opt(file_ext)
.with_target_partitions(target_partitions);
let schema = Schema::new(vec![Field::new("a", DataType::Boolean,
false)]);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]