This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new fdb8fecf0 Extract Listing URI logic into ListingTableUri structure
(#2578)
fdb8fecf0 is described below
commit fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon May 30 16:53:42 2022 +0100
Extract Listing URI logic into ListingTableUri structure (#2578)
* Extract Listing URI logic into ListingTableUri structure
* Fix explain test CPU count sensitivity
* Windows compatibility
* More windows pacification
* Even more windows pacification
* Fix windows doctest
* Add ObjectStoreUrl
* Update ballista pin
* Format
* Clippy
* Consistent naming
* Review feedback
* Fix windows test
---
benchmarks/src/bin/tpch.rs | 4 +-
datafusion-examples/examples/flight_server.rs | 6 +-
datafusion/core/Cargo.toml | 3 +
datafusion/core/benches/sort_limit_query_sql.rs | 7 +-
datafusion/core/src/catalog/schema.rs | 36 ++-
datafusion/core/src/datasource/file_format/mod.rs | 2 +
datafusion/core/src/datasource/listing/helpers.rs | 202 ++++++--------
datafusion/core/src/datasource/listing/mod.rs | 2 +
datafusion/core/src/datasource/listing/table.rs | 63 +++--
datafusion/core/src/datasource/listing/url.rs | 307 +++++++++++++++++++++
datafusion/core/src/datasource/mod.rs | 2 +-
datafusion/core/src/datasource/object_store.rs | 218 +++++++++++++++
.../core/src/datasource/object_store_registry.rs | 138 ---------
datafusion/core/src/execution/context.rs | 87 +++---
datafusion/core/src/execution/runtime_env.rs | 10 +-
datafusion/core/src/lib.rs | 10 +-
.../core/src/physical_optimizer/repartition.rs | 2 +
.../core/src/physical_plan/file_format/avro.rs | 22 +-
.../core/src/physical_plan/file_format/json.rs | 25 +-
.../core/src/physical_plan/file_format/mod.rs | 5 +-
.../core/src/physical_plan/file_format/parquet.rs | 5 +
datafusion/core/src/physical_plan/mod.rs | 7 +-
datafusion/core/src/test/mod.rs | 2 +
datafusion/core/src/test/object_store.rs | 2 +-
datafusion/core/tests/path_partition.rs | 16 +-
datafusion/core/tests/row.rs | 11 +-
datafusion/core/tests/sql/explain_analyze.rs | 25 +-
datafusion/core/tests/sql/json.rs | 2 +-
datafusion/core/tests/sql/mod.rs | 58 +++-
datafusion/data-access/Cargo.toml | 3 +-
datafusion/data-access/src/object_store/local.rs | 26 --
datafusion/data-access/src/object_store/mod.rs | 195 +------------
datafusion/proto/src/logical_plan.rs | 9 +-
dev/build-arrow-ballista.sh | 2 +-
34 files changed, 871 insertions(+), 643 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index ba10d51c2..c46badd64 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -50,6 +50,7 @@ use datafusion::{
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use datafusion::datasource::listing::ListingTableUrl;
use serde::Serialize;
use structopt::StructOpt;
@@ -425,7 +426,8 @@ fn get_table(
table_partition_cols: vec![],
};
- let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path)
+ let table_path = ListingTableUrl::parse(path)?;
+ let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}),
table_path)
.with_listing_options(options)
.with_schema(schema);
diff --git a/datafusion-examples/examples/flight_server.rs
b/datafusion-examples/examples/flight_server.rs
index 703cb7025..a3d7c0f56 100644
--- a/datafusion-examples/examples/flight_server.rs
+++ b/datafusion-examples/examples/flight_server.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
use arrow_flight::SchemaAsIpc;
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::listing::ListingOptions;
+use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
use futures::Stream;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
@@ -68,9 +68,11 @@ impl FlightService for FlightServiceImpl {
let request = request.into_inner();
let listing_options =
ListingOptions::new(Arc::new(ParquetFormat::default()));
+ let table_path =
+ ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?;
let schema = listing_options
- .infer_schema(Arc::new(LocalFileSystem {}), &request.path[0])
+ .infer_schema(Arc::new(LocalFileSystem {}), &table_path)
.await
.unwrap();
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index f8170443e..08da53f9c 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr",
version = "8.0.0" }
datafusion-row = { path = "../row", version = "8.0.0" }
datafusion-sql = { path = "../sql", version = "8.0.0" }
futures = "0.3"
+glob = "0.3.0"
hashbrown = { version = "0.12", features = ["raw"] }
+itertools = "0.10"
lazy_static = { version = "^1.4.0" }
log = "^0.4"
num-traits = { version = "0.2", optional = true }
@@ -85,6 +87,7 @@ sqlparser = "0.17"
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync", "fs", "parking_lot"] }
tokio-stream = "0.1"
+url = "2.2"
uuid = { version = "1.0", features = ["v4"] }
[dev-dependencies]
diff --git a/datafusion/core/benches/sort_limit_query_sql.rs
b/datafusion/core/benches/sort_limit_query_sql.rs
index faeff6bc9..d1f253a98 100644
--- a/datafusion/core/benches/sort_limit_query_sql.rs
+++ b/datafusion/core/benches/sort_limit_query_sql.rs
@@ -20,7 +20,9 @@ extern crate criterion;
use criterion::Criterion;
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion::datasource::file_format::csv::CsvFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable,
ListingTableConfig};
+use datafusion::datasource::listing::{
+ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
use parking_lot::Mutex;
use std::sync::Arc;
@@ -64,11 +66,12 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
let testdata = datafusion::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
+ let table_path = ListingTableUrl::parse(path).unwrap();
// create CSV data source
let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
- let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path)
+ let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}),
table_path)
.with_listing_options(listing_options)
.with_schema(schema);
diff --git a/datafusion/core/src/catalog/schema.rs
b/datafusion/core/src/catalog/schema.rs
index 9ef5c3d67..748cad52b 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -23,8 +23,8 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
-use crate::datasource::listing::{ListingTable, ListingTableConfig};
-use crate::datasource::object_store_registry::ObjectStoreRegistry;
+use crate::datasource::listing::{ListingTable, ListingTableConfig,
ListingTableUrl};
+use crate::datasource::object_store::ObjectStoreRegistry;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use datafusion_data_access::object_store::ObjectStore;
@@ -156,31 +156,33 @@ impl ObjectStoreSchemaProvider {
.register_store(scheme.into(), object_store)
}
- /// Retrieves a `ObjectStore` instance by scheme
- pub fn object_store<'a>(
+ /// Retrieves a `ObjectStore` instance for a given Url
+ pub fn object_store(
&self,
- uri: &'a str,
- ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
+ url: impl AsRef<url::Url>,
+ ) -> Result<Arc<dyn ObjectStore>> {
self.object_store_registry
.lock()
- .get_by_uri(uri)
+ .get_by_url(url)
.map_err(DataFusionError::from)
}
/// If supported by the implementation, adds a new table to this schema by
creating a
- /// `ListingTable` from the provided `uri` and a previously registered
`ObjectStore`.
+ /// `ListingTable` from the provided `url` and a previously registered
`ObjectStore`.
/// If a table of the same name existed before, it returns "Table already
exists" error.
pub async fn register_listing_table(
&self,
name: &str,
- uri: &str,
+ table_path: ListingTableUrl,
config: Option<ListingTableConfig>,
) -> Result<()> {
let config = match config {
Some(cfg) => cfg,
None => {
- let (object_store, _path) = self.object_store(uri)?;
- ListingTableConfig::new(object_store, uri).infer().await?
+ let object_store = self.object_store(&table_path)?;
+ ListingTableConfig::new(object_store, table_path)
+ .infer()
+ .await?
}
};
@@ -255,6 +257,7 @@ mod tests {
use crate::datasource::empty::EmptyTable;
use crate::execution::context::SessionContext;
+ use crate::datasource::listing::ListingTableUrl;
use futures::StreamExt;
#[tokio::test]
@@ -280,12 +283,13 @@ mod tests {
async fn test_schema_register_listing_table() {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+ let table_path = ListingTableUrl::parse(filename).unwrap();
let schema = ObjectStoreSchemaProvider::new();
let _store = schema.register_object_store("test",
Arc::new(LocalFileSystem {}));
schema
- .register_listing_table("alltypes_plain", &filename, None)
+ .register_listing_table("alltypes_plain", table_path, None)
.await
.unwrap();
@@ -338,8 +342,9 @@ mod tests {
|| file == OsStr::new("alltypes_plain.parquet")
{
let name = path.file_stem().unwrap().to_str().unwrap();
+ let path = ListingTableUrl::parse(&sized_file.path).unwrap();
schema
- .register_listing_table(name, &sized_file.path, None)
+ .register_listing_table(name, path, None)
.await
.unwrap();
}
@@ -360,17 +365,18 @@ mod tests {
async fn test_schema_register_same_listing_table() {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+ let table_path = ListingTableUrl::parse(filename).unwrap();
let schema = ObjectStoreSchemaProvider::new();
let _store = schema.register_object_store("test",
Arc::new(LocalFileSystem {}));
schema
- .register_listing_table("alltypes_plain", &filename, None)
+ .register_listing_table("alltypes_plain", table_path.clone(), None)
.await
.unwrap();
schema
- .register_listing_table("alltypes_plain", &filename, None)
+ .register_listing_table("alltypes_plain", table_path, None)
.await
.unwrap();
}
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 669ed0efd..eae86fa9c 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -85,6 +85,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
pub(crate) mod test_util {
use super::*;
use crate::datasource::listing::PartitionedFile;
+ use crate::datasource::object_store::ObjectStoreUrl;
use datafusion_data_access::object_store::local::{
local_unpartitioned_file, LocalFileSystem,
};
@@ -115,6 +116,7 @@ pub(crate) mod test_util {
.create_physical_plan(
FileScanConfig {
object_store: store,
+ object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema,
file_groups,
statistics,
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index 11a91f2ee..a26eafabb 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -17,7 +17,6 @@
//! Helper functions for the table implementation
-use std::path::{Component, Path};
use std::sync::Arc;
use arrow::{
@@ -29,11 +28,7 @@ use arrow::{
record_batch::RecordBatch,
};
use chrono::{TimeZone, Utc};
-use datafusion_common::DataFusionError;
-use futures::{
- stream::{self},
- StreamExt, TryStreamExt,
-};
+use futures::{stream::BoxStream, TryStreamExt};
use log::debug;
use crate::{
@@ -44,7 +39,8 @@ use crate::{
scalar::ScalarValue,
};
-use super::{PartitionedFile, PartitionedFileStream};
+use super::PartitionedFile;
+use crate::datasource::listing::ListingTableUrl;
use datafusion_data_access::{object_store::ObjectStore, FileMeta, SizedFile};
use datafusion_expr::Volatility;
@@ -161,94 +157,53 @@ pub fn split_files(
/// TODO for tables with many files (10k+), it will usually more efficient
/// to first list the folders relative to the first partition dimension,
/// prune those, then list only the contain of the remaining folders.
-pub async fn pruned_partition_list(
- store: &dyn ObjectStore,
- table_path: &str,
+pub async fn pruned_partition_list<'a>(
+ store: &'a dyn ObjectStore,
+ table_path: &'a ListingTableUrl,
filters: &[Expr],
- file_extension: &str,
- table_partition_cols: &[String],
-) -> Result<PartitionedFileStream> {
+ file_extension: &'a str,
+ table_partition_cols: &'a [String],
+) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
+ let list = table_path.list_all_files(store, file_extension);
+
// if no partition col => simply list all the files
if table_partition_cols.is_empty() {
- return Ok(Box::pin(
- store
- .glob_file_with_suffix(table_path, file_extension)
- .await?
- .map(|f| {
- Ok(PartitionedFile {
- partition_values: vec![],
- file_meta: f?,
- range: None,
- })
- }),
- ));
+ return Ok(Box::pin(list.map_ok(|object_meta| object_meta.into())));
}
let applicable_filters: Vec<_> = filters
.iter()
.filter(|f| expr_applicable_for_cols(table_partition_cols, f))
.collect();
- let stream_path = table_path.to_owned();
+
if applicable_filters.is_empty() {
// Parse the partition values while listing all the files
// Note: We might avoid parsing the partition values if they are not
used in any projection,
// but the cost of parsing will likely be far dominated by the time to
fetch the listing from
// the object store.
- let table_partition_cols_stream = table_partition_cols.to_vec();
- Ok(Box::pin(
- store
- .glob_file_with_suffix(table_path, file_extension)
- .await?
- .filter_map(move |f| {
- let stream_path = stream_path.clone();
- let table_partition_cols_stream =
table_partition_cols_stream.clone();
- async move {
- let file_meta = match f {
- Ok(fm) => fm,
- Err(err) => return Some(Err(err)),
- };
- let parsed_path = parse_partitions_for_path(
- &stream_path,
- file_meta.path(),
- &table_partition_cols_stream,
- )
- .map(|p| {
- p.iter()
- .map(|&pn|
ScalarValue::Utf8(Some(pn.to_owned())))
- .collect()
- });
-
- parsed_path.map(|partition_values| {
- Ok(PartitionedFile {
- partition_values,
- file_meta,
- range: None,
- })
- })
- }
- }),
- ))
+ Ok(Box::pin(list.try_filter_map(move |file_meta| async move {
+ let parsed_path = parse_partitions_for_path(
+ table_path,
+ file_meta.path(),
+ table_partition_cols,
+ )
+ .map(|p| {
+ p.iter()
+ .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
+ .collect()
+ });
+
+ Ok(parsed_path.map(|partition_values| PartitionedFile {
+ partition_values,
+ file_meta,
+ range: None,
+ }))
+ })))
} else {
// parse the partition values and serde them as a RecordBatch to
filter them
- // TODO avoid collecting but have a streaming memory table instead
- let batches: Vec<RecordBatch> = store
- .glob_file_with_suffix(table_path, file_extension)
- .await?
- // TODO we set an arbitrary high batch size here, it does not
matter as we list
- // all the files anyway. This number will need to be adjusted
according to the object
- // store if we switch to a streaming-stlye pruning of the files.
For instance S3 lists
- // 1000 items at a time so batches of 1000 would be ideal with S3
as store.
- .chunks(1024)
- .map(|v| {
- v.into_iter()
- .collect::<datafusion_data_access::Result<Vec<_>>>()
- })
- .map_err(DataFusionError::IoError)
- .map(move |metas| paths_to_batch(table_partition_cols,
&stream_path, &metas?))
- .try_collect()
- .await?;
-
- let mem_table = MemTable::try_new(batches[0].schema(), vec![batches])?;
+ let metas: Vec<_> = list.try_collect().await?;
+ let batch = paths_to_batch(table_partition_cols, table_path, &metas)?;
+ let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
// Filter the partitions using a local datafusion context
// TODO having the external context would allow us to resolve
`Volatility::Stable`
@@ -260,7 +215,7 @@ pub async fn pruned_partition_list(
}
let filtered_batches = df.collect().await?;
- Ok(Box::pin(stream::iter(
+ Ok(Box::pin(futures::stream::iter(
batches_to_paths(&filtered_batches).into_iter().map(Ok),
)))
}
@@ -275,7 +230,7 @@ pub async fn pruned_partition_list(
/// Note: For the last modified date, this looses precisions higher than
millisecond.
fn paths_to_batch(
table_partition_cols: &[String],
- table_path: &str,
+ table_path: &ListingTableUrl,
metas: &[FileMeta],
) -> Result<RecordBatch> {
let mut key_builder = StringBuilder::new(metas.len());
@@ -373,21 +328,15 @@ fn batches_to_paths(batches: &[RecordBatch]) ->
Vec<PartitionedFile> {
/// Extract the partition values for the given `file_path` (in the given
`table_path`)
/// associated to the partitions defined by `table_partition_cols`
fn parse_partitions_for_path<'a>(
- table_path: &str,
+ table_path: &ListingTableUrl,
file_path: &'a str,
table_partition_cols: &[String],
) -> Option<Vec<&'a str>> {
- let subpath = file_path.strip_prefix(table_path)?;
-
- // split subpath into components ignoring leading separator if exists
- let subpath = Path::new(subpath)
- .components()
- .filter(|c| !matches!(c, Component::RootDir))
- .filter_map(|c| c.as_os_str().to_str());
+ let subpath = table_path.strip_prefix(file_path)?;
let mut part_values = vec![];
- for (path, pn) in subpath.zip(table_partition_cols) {
- match path.split_once('=') {
+ for (part, pn) in subpath.zip(table_partition_cols) {
+ match part.split_once('=') {
Some((name, val)) if name == pn => part_values.push(val),
_ => return None,
}
@@ -401,6 +350,7 @@ mod tests {
logical_plan::{case, col, lit},
test::object_store::TestObjectStore,
};
+ use futures::StreamExt;
use super::*;
@@ -453,7 +403,7 @@ mod tests {
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
store.as_ref(),
- "tablepath/",
+ &ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
&[String::from("mypartition")],
@@ -476,33 +426,34 @@ mod tests {
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
store.as_ref(),
- "tablepath/",
+ &ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
&[String::from("mypartition")],
)
.await
.expect("partition pruning failed")
- .collect::<Vec<_>>()
- .await;
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
assert_eq!(pruned.len(), 2);
- let f1 = pruned[0].as_ref().expect("first item not an error");
+ let f1 = &pruned[0];
assert_eq!(
- &f1.file_meta.sized_file.path,
+ f1.file_meta.path(),
"tablepath/mypartition=val1/file.parquet"
);
assert_eq!(
&f1.partition_values,
&[ScalarValue::Utf8(Some(String::from("val1"))),]
);
- let f2 = pruned[1].as_ref().expect("second item not an error");
+ let f2 = &pruned[1];
assert_eq!(
- &f2.file_meta.sized_file.path,
+ f2.file_meta.path(),
"tablepath/mypartition=val1/other=val3/file.parquet"
);
assert_eq!(
- &f2.partition_values,
+ f2.partition_values,
&[ScalarValue::Utf8(Some(String::from("val1"))),]
);
}
@@ -522,20 +473,21 @@ mod tests {
let filter3 = Expr::eq(col("part2"), col("other"));
let pruned = pruned_partition_list(
store.as_ref(),
- "tablepath/",
+ &ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter1, filter2, filter3],
".parquet",
&[String::from("part1"), String::from("part2")],
)
.await
.expect("partition pruning failed")
- .collect::<Vec<_>>()
- .await;
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
assert_eq!(pruned.len(), 2);
- let f1 = pruned[0].as_ref().expect("first item not an error");
+ let f1 = &pruned[0];
assert_eq!(
- &f1.file_meta.sized_file.path,
+ f1.file_meta.path(),
"tablepath/part1=p1v2/part2=p2v1/file1.parquet"
);
assert_eq!(
@@ -545,9 +497,9 @@ mod tests {
ScalarValue::Utf8(Some(String::from("p2v1")))
]
);
- let f2 = pruned[1].as_ref().expect("second item not an error");
+ let f2 = &pruned[1];
assert_eq!(
- &f2.file_meta.sized_file.path,
+ f2.file_meta.path(),
"tablepath/part1=p1v2/part2=p2v1/file2.parquet"
);
assert_eq!(
@@ -563,12 +515,16 @@ mod tests {
fn test_parse_partitions_for_path() {
assert_eq!(
Some(vec![]),
- parse_partitions_for_path("bucket/mytable",
"bucket/mytable/file.csv", &[])
+ parse_partitions_for_path(
+ &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
+ "bucket/mytable/file.csv",
+ &[]
+ )
);
assert_eq!(
None,
parse_partitions_for_path(
- "bucket/othertable",
+ &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
"bucket/mytable/file.csv",
&[]
)
@@ -576,7 +532,7 @@ mod tests {
assert_eq!(
None,
parse_partitions_for_path(
- "bucket/mytable",
+ &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
"bucket/mytable/file.csv",
&[String::from("mypartition")]
)
@@ -584,7 +540,7 @@ mod tests {
assert_eq!(
Some(vec!["v1"]),
parse_partitions_for_path(
- "bucket/mytable",
+ &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
"bucket/mytable/mypartition=v1/file.csv",
&[String::from("mypartition")]
)
@@ -592,7 +548,7 @@ mod tests {
assert_eq!(
Some(vec!["v1"]),
parse_partitions_for_path(
- "bucket/mytable/",
+ &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
"bucket/mytable/mypartition=v1/file.csv",
&[String::from("mypartition")]
)
@@ -601,7 +557,7 @@ mod tests {
assert_eq!(
None,
parse_partitions_for_path(
- "bucket/mytable",
+ &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
"bucket/mytable/v1/file.csv",
&[String::from("mypartition")]
)
@@ -609,7 +565,7 @@ mod tests {
assert_eq!(
Some(vec!["v1", "v2"]),
parse_partitions_for_path(
- "bucket/mytable",
+ &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
"bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
&[String::from("mypartition"), String::from("otherpartition")]
)
@@ -617,7 +573,7 @@ mod tests {
assert_eq!(
Some(vec!["v1"]),
parse_partitions_for_path(
- "bucket/mytable",
+ &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
"bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
&[String::from("mypartition")]
)
@@ -630,7 +586,7 @@ mod tests {
assert_eq!(
Some(vec!["v1"]),
parse_partitions_for_path(
- "bucket\\mytable",
+ &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
"bucket\\mytable\\mypartition=v1\\file.csv",
&[String::from("mypartition")]
)
@@ -638,7 +594,7 @@ mod tests {
assert_eq!(
Some(vec!["v1", "v2"]),
parse_partitions_for_path(
- "bucket\\mytable",
+ &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
"bucket\\mytable\\mypartition=v1\\otherpartition=v2\\file.csv",
&[String::from("mypartition"), String::from("otherpartition")]
)
@@ -664,7 +620,8 @@ mod tests {
},
];
- let batches = paths_to_batch(&[], "mybucket/tablepath", &files)
+ let table_path =
ListingTableUrl::parse("file:///mybucket/tablepath").unwrap();
+ let batches = paths_to_batch(&[], &table_path, &files)
.expect("Serialization of file list to batch failed");
let parsed_files = batches_to_paths(&[batches]);
@@ -698,9 +655,12 @@ mod tests {
},
];
- let batches =
- paths_to_batch(&[String::from("part1")], "mybucket/tablepath",
&files)
- .expect("Serialization of file list to batch failed");
+ let batches = paths_to_batch(
+ &[String::from("part1")],
+ &ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(),
+ &files,
+ )
+ .expect("Serialization of file list to batch failed");
let parsed_files = batches_to_paths(&[batches]);
assert_eq!(parsed_files.len(), 2);
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/core/src/datasource/listing/mod.rs
index 0f0a7d20e..c11de5f80 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -20,12 +20,14 @@
mod helpers;
mod table;
+mod url;
use datafusion_common::ScalarValue;
use datafusion_data_access::{FileMeta, Result, SizedFile};
use futures::Stream;
use std::pin::Pin;
+pub use self::url::ListingTableUrl;
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
/// Stream of files get listed from object store
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 1dceb8b35..34e44971d 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -28,7 +28,9 @@ use crate::datasource::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
parquet::ParquetFormat,
FileFormat,
},
- get_statistics_with_limit, TableProvider, TableType,
+ get_statistics_with_limit,
+ listing::ListingTableUrl,
+ TableProvider, TableType,
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::{
@@ -51,7 +53,7 @@ pub struct ListingTableConfig {
/// `ObjectStore` that contains the files for the `ListingTable`.
pub object_store: Arc<dyn ObjectStore>,
/// Path on the `ObjectStore` for creating `ListingTable`.
- pub table_path: String,
+ pub table_path: ListingTableUrl,
/// Optional `SchemaRef` for the to be created `ListingTable`.
pub file_schema: Option<SchemaRef>,
/// Optional `ListingOptions` for the to be created `ListingTable`.
@@ -60,13 +62,10 @@ pub struct ListingTableConfig {
impl ListingTableConfig {
/// Creates new `ListingTableConfig`. The `SchemaRef` and
`ListingOptions` are inferred based on the suffix of the provided `table_path`.
- pub fn new(
- object_store: Arc<dyn ObjectStore>,
- table_path: impl Into<String>,
- ) -> Self {
+ pub fn new(object_store: Arc<dyn ObjectStore>, table_path:
ListingTableUrl) -> Self {
Self {
object_store,
- table_path: table_path.into(),
+ table_path,
file_schema: None,
options: None,
}
@@ -106,18 +105,18 @@ impl ListingTableConfig {
/// Infer `ListingOptions` based on `table_path` suffix.
pub async fn infer_options(self) -> Result<Self> {
- let mut files = self.object_store.list_file(&self.table_path).await?;
- let file = files
+ let file = self
+ .table_path
+ .list_all_files(self.object_store.as_ref(), "")
.next()
.await
.ok_or_else(|| DataFusionError::Internal("No files for
table".into()))??;
- let tokens: Vec<&str> = file.path().split('.').collect();
- let file_type = tokens.last().ok_or_else(|| {
+ let file_type = file.path().rsplit('.').next().ok_or_else(|| {
DataFusionError::Internal("Unable to infer file suffix".into())
})?;
- let format = ListingTableConfig::infer_format(*file_type)?;
+ let format = ListingTableConfig::infer_format(file_type)?;
let listing_options = ListingOptions {
format,
@@ -140,7 +139,7 @@ impl ListingTableConfig {
match self.options {
Some(options) => {
let schema = options
- .infer_schema(self.object_store.clone(),
self.table_path.as_str())
+ .infer_schema(self.object_store.clone(), &self.table_path)
.await?;
Ok(Self {
@@ -213,10 +212,9 @@ impl ListingOptions {
pub async fn infer_schema<'a>(
&'a self,
store: Arc<dyn ObjectStore>,
- path: &'a str,
+ table_path: &'a ListingTableUrl,
) -> Result<SchemaRef> {
- let extension = &self.file_extension;
- let list_stream = store.glob_file_with_suffix(path, extension).await?;
+ let list_stream = table_path.list_all_files(store.as_ref(),
&self.file_extension);
let files: Vec<_> = list_stream.try_collect().await?;
self.format.infer_schema(&store, &files).await
}
@@ -226,7 +224,7 @@ impl ListingOptions {
/// or file system listing capability to get the list of files.
pub struct ListingTable {
object_store: Arc<dyn ObjectStore>,
- table_path: String,
+ table_path: ListingTableUrl,
/// File fields only
file_schema: SchemaRef,
/// File fields + partition columns
@@ -276,10 +274,12 @@ impl ListingTable {
pub fn object_store(&self) -> &Arc<dyn ObjectStore> {
&self.object_store
}
+
/// Get path ref
- pub fn table_path(&self) -> &str {
+ pub fn table_path(&self) -> &ListingTableUrl {
&self.table_path
}
+
/// Get options ref
pub fn options(&self) -> &ListingOptions {
&self.options
@@ -322,6 +322,7 @@ impl TableProvider for ListingTable {
.create_physical_plan(
FileScanConfig {
object_store: Arc::clone(&self.object_store),
+ object_store_url: self.table_path.object_store(),
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
statistics,
@@ -369,6 +370,7 @@ impl ListingTable {
.await?;
// collect the statistics if required by the config
+ // TODO: Collect statistics and schema in single-pass
let object_store = Arc::clone(&self.object_store);
let files = file_list.then(move |part_file| {
let object_store = object_store.clone();
@@ -436,11 +438,12 @@ mod tests {
async fn load_table_stats_by_default() -> Result<()> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+ let table_path = ListingTableUrl::parse(filename).unwrap();
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema = opt
- .infer_schema(Arc::new(LocalFileSystem {}), &filename)
+ .infer_schema(Arc::new(LocalFileSystem {}), &table_path)
.await?;
- let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}),
filename)
+ let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}),
table_path)
.with_listing_options(opt)
.with_schema(schema);
let table = ListingTable::try_new(config)?;
@@ -464,9 +467,10 @@ mod tests {
collect_stat: true,
};
+ let table_path = ListingTableUrl::parse("file:///table/").unwrap();
let file_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean,
false)]));
- let config = ListingTableConfig::new(store, "table/")
+ let config = ListingTableConfig::new(store, table_path)
.with_listing_options(opt)
.with_schema(file_schema);
let table = ListingTable::try_new(config)?;
@@ -504,7 +508,7 @@ mod tests {
"bucket/key-prefix/file3",
"bucket/key-prefix/file4",
],
- "bucket/key-prefix/",
+ "file:///bucket/key-prefix/",
12,
5,
)
@@ -518,7 +522,7 @@ mod tests {
"bucket/key-prefix/file2",
"bucket/key-prefix/file3",
],
- "bucket/key-prefix/",
+ "file:///bucket/key-prefix/",
4,
4,
)
@@ -533,14 +537,15 @@ mod tests {
"bucket/key-prefix/file3",
"bucket/key-prefix/file4",
],
- "bucket/key-prefix/",
+ "file:///bucket/key-prefix/",
2,
2,
)
.await?;
// no files => no groups
- assert_list_files_for_scan_grouping(&[], "bucket/key-prefix/", 2,
0).await?;
+ assert_list_files_for_scan_grouping(&[], "file:///bucket/key-prefix/",
2, 0)
+ .await?;
// files that don't match the prefix
assert_list_files_for_scan_grouping(
@@ -549,7 +554,7 @@ mod tests {
"bucket/key-prefix/file1",
"bucket/other-prefix/roguefile",
],
- "bucket/key-prefix/",
+ "file:///bucket/key-prefix/",
10,
2,
)
@@ -560,7 +565,8 @@ mod tests {
async fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
- let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}),
filename)
+ let table_path = ListingTableUrl::parse(filename).unwrap();
+ let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}),
table_path)
.infer()
.await?;
let table = ListingTable::try_new(config)?;
@@ -590,7 +596,8 @@ mod tests {
let schema = Schema::new(vec![Field::new("a", DataType::Boolean,
false)]);
- let config = ListingTableConfig::new(mock_store,
table_prefix.to_owned())
+ let table_path = ListingTableUrl::parse(table_prefix).unwrap();
+ let config = ListingTableConfig::new(mock_store, table_path)
.with_listing_options(opt)
.with_schema(Arc::new(schema));
diff --git a/datafusion/core/src/datasource/listing/url.rs
b/datafusion/core/src/datasource/listing/url.rs
new file mode 100644
index 000000000..041a6ab7f
--- /dev/null
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see
[`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+ /// A URL that identifies a file or directory to list files from
+ url: Url,
+ /// An optional glob expression used to filter files
+ glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+ /// Parse a provided string as a `ListingTableUrl`
+ ///
+ /// # Paths without a Scheme
+ ///
+ /// If no scheme is provided, or the string is an absolute filesystem path
+ /// as determined [`std::path::Path::is_absolute`], the string will be
+ /// interpreted as a path on the local filesystem using the operating
+ /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+ ///
+ /// If the path contains any of `'?', '*', '['`, it will be considered
+ /// a glob expression and resolved as described in the section below.
+ ///
+ /// Otherwise, the path will be resolved to an absolute path, returning
+ /// an error if it does not exist, and converted to a [file URI]
+ ///
+ /// If you wish to specify a path that does not exist on the local
+ /// machine you must provide it as a fully-qualified [file URI]
+ /// e.g. `file:///myfile.txt`
+ ///
+ /// ## Glob File Paths
+ ///
+ /// If no scheme is provided, and the path contains a glob expression, it
will
+ /// be resolved as follows.
+ ///
+ /// The string up to the first path segment containing a glob expression
will be extracted,
+ /// and resolved in the same manner as a normal scheme-less path. That is,
resolved to
+ /// an absolute path on the local filesystem, returning an error if it
does not exist,
+ /// and converted to a [file URI]
+ ///
+ /// The remaining string will be interpreted as a [`glob::Pattern`] and
used as a
+ /// filter when listing files from object storage
+ ///
+ /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+ pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+ let s = s.as_ref();
+
+ // This is necessary to handle the case of a path starting with a
drive letter
+ if std::path::Path::new(s).is_absolute() {
+ return Self::parse_path(s);
+ }
+
+ match Url::parse(s) {
+ Ok(url) => Ok(Self { url, glob: None }),
+ Err(url::ParseError::RelativeUrlWithoutBase) =>
Self::parse_path(s),
+ Err(e) => Err(DataFusionError::External(Box::new(e))),
+ }
+ }
+
+ /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+ fn parse_path(s: &str) -> Result<Self> {
+ let (prefix, glob) = match split_glob_expression(s) {
+ Some((prefix, glob)) => {
+ let glob = Pattern::new(glob)
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ (prefix, Some(glob))
+ }
+ None => (s, None),
+ };
+
+ let path = std::path::Path::new(prefix).canonicalize()?;
+ let url = match path.is_file() {
+ true => Url::from_file_path(path).unwrap(),
+ false => Url::from_directory_path(path).unwrap(),
+ };
+
+ Ok(Self { url, glob })
+ }
+
+ /// Returns the URL scheme
+ pub fn scheme(&self) -> &str {
+ self.url.scheme()
+ }
+
+ /// Returns the path as expected by [`ObjectStore`]
+ ///
+ /// In particular for file scheme URLs, this is an absolute
+ /// on the local filesystem in the OS-specific path representation
+ ///
+ /// For other URLs, this is a the host and path of the URL,
+ /// delimited by `/`, and with no leading `/`
+ ///
+ /// TODO: Handle paths consistently (#2489)
+ fn prefix(&self) -> &str {
+ match self.scheme() {
+ "file" => match cfg!(target_family = "windows") {
+ true => self.url.path().strip_prefix('/').unwrap(),
+ false => self.url.path(),
+ },
+ _ =>
&self.url[url::Position::BeforeHost..url::Position::AfterPath],
+ }
+ }
+
+ /// Strips the prefix of this [`ListingTableUrl`] from the provided path,
returning
+ /// an iterator of the remaining path segments
+ ///
+ /// TODO: Handle paths consistently (#2489)
+ pub(crate) fn strip_prefix<'a, 'b: 'a>(
+ &'a self,
+ path: &'b str,
+ ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+ let prefix = self.prefix();
+ // Ignore empty path segments
+ let diff = itertools::diff_with(
+ path.split(is_separator).filter(|s| !s.is_empty()),
+ prefix.split(is_separator).filter(|s| !s.is_empty()),
+ |a, b| a == b,
+ );
+
+ match diff {
+ // Match with remaining
+ Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+ _ => None,
+ }
+ }
+
+ /// List all files identified by this [`ListingTableUrl`] for the provided
`file_extension`
+ pub(crate) fn list_all_files<'a>(
+ &'a self,
+ store: &'a dyn ObjectStore,
+ file_extension: &'a str,
+ ) -> BoxStream<'a, Result<FileMeta>> {
+ futures::stream::once(async move {
+ let prefix = self.prefix();
+ store.list_file(prefix.as_ref()).await
+ })
+ .try_flatten()
+ .map_err(DataFusionError::IoError)
+ .try_filter(move |meta| {
+ let path = meta.path();
+
+ let extension_match = path.ends_with(file_extension);
+ let glob_match = match &self.glob {
+ Some(glob) => match self.strip_prefix(path) {
+ Some(mut segments) => {
+ let stripped = segments.join("/");
+ glob.matches(&stripped)
+ }
+ None => false,
+ },
+ None => true,
+ };
+
+ futures::future::ready(extension_match && glob_match)
+ })
+ .boxed()
+ }
+
+ /// Returns this [`ListingTableUrl`] as a string
+ pub fn as_str(&self) -> &str {
+ self.as_ref()
+ }
+
+ /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
+ pub fn object_store(&self) -> ObjectStoreUrl {
+ let url =
&self.url[url::Position::BeforeScheme..url::Position::BeforePath];
+ ObjectStoreUrl::parse(url).unwrap()
+ }
+}
+
+impl AsRef<str> for ListingTableUrl {
+ fn as_ref(&self) -> &str {
+ self.url.as_ref()
+ }
+}
+
+impl AsRef<Url> for ListingTableUrl {
+ fn as_ref(&self) -> &Url {
+ &self.url
+ }
+}
+
+impl std::fmt::Display for ListingTableUrl {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ self.as_str().fmt(f)
+ }
+}
+
+const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
+
+/// Splits `path` at the first path segment containing a glob expression,
returning
+/// `None` if no glob expression found.
+///
+/// Path delimiters are determined using [`std::path::is_separator`] which
+/// permits `/` as a path delimiter even on Windows platforms.
+///
+fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
+ let mut last_separator = 0;
+
+ for (byte_idx, char) in path.char_indices() {
+ if GLOB_START_CHARS.contains(&char) {
+ if last_separator == 0 {
+ return Some((".", path));
+ }
+ return Some(path.split_at(last_separator));
+ }
+
+ if std::path::is_separator(char) {
+ last_separator = byte_idx + char.len_utf8();
+ }
+ }
+ None
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_prefix_path() {
+ let root = std::env::current_dir().unwrap();
+ let root = root.to_string_lossy();
+
+ let url = ListingTableUrl::parse(&root).unwrap();
+ let child = format!("{}/partition/file", root);
+
+ let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
+ assert_eq!(prefix, vec!["partition", "file"]);
+ }
+
+ #[test]
+ fn test_prefix_s3() {
+ let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
+ assert_eq!(url.prefix(), "bucket/foo/bar");
+
+ let path = "bucket/foo/bar/partition/foo.parquet";
+ let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect();
+ assert_eq!(prefix, vec!["partition", "foo.parquet"]);
+
+ let path = "other-bucket/foo/bar/partition/foo.parquet";
+ assert!(url.strip_prefix(path).is_none());
+ }
+
+ #[test]
+ fn test_split_glob() {
+ fn test(input: &str, expected: Option<(&str, &str)>) {
+ assert_eq!(
+ split_glob_expression(input),
+ expected,
+ "testing split_glob_expression with {}",
+ input
+ );
+ }
+
+ // no glob patterns
+ test("/", None);
+ test("/a.txt", None);
+ test("/a", None);
+ test("/a/", None);
+ test("/a/b", None);
+ test("/a/b/", None);
+ test("/a/b.txt", None);
+ test("/a/b/c.txt", None);
+ // glob patterns, thus we build the longest path (os-specific)
+ test("*.txt", Some((".", "*.txt")));
+ test("/*.txt", Some(("/", "*.txt")));
+ test("/a/*b.txt", Some(("/a/", "*b.txt")));
+ test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
+ test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
+ test("/a/b*.txt", Some(("/a/", "b*.txt")));
+ test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
+
+ // https://github.com/apache/arrow-datafusion/issues/2465
+ test(
+ "/a/b/c//alltypes_plain*.parquet",
+ Some(("/a/b/c//", "alltypes_plain*.parquet")),
+ );
+ }
+}
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index f3cc0a04e..65fc2adcb 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -23,7 +23,7 @@ pub mod empty;
pub mod file_format;
pub mod listing;
pub mod memory;
-pub mod object_store_registry;
+pub mod object_store;
pub mod view;
use futures::Stream;
diff --git a/datafusion/core/src/datasource/object_store.rs
b/datafusion/core/src/datasource/object_store.rs
new file mode 100644
index 000000000..ac7e1a847
--- /dev/null
+++ b/datafusion/core/src/datasource/object_store.rs
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme
for each store.
+//! This allows the user to extend DataFusion with different storage systems
such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem,
LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+ url: Url,
+}
+
+impl ObjectStoreUrl {
+ /// Parse an [`ObjectStoreUrl`] from a string
+ pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+ let mut parsed =
+ Url::parse(s.as_ref()).map_err(|e|
DataFusionError::External(Box::new(e)))?;
+
+ let remaining = &parsed[url::Position::BeforePath..];
+ if !remaining.is_empty() && remaining != "/" {
+ return Err(DataFusionError::Execution(format!(
+ "ObjectStoreUrl must only contain scheme and authority, got:
{}",
+ remaining
+ )));
+ }
+
+ // Always set path for consistency
+ parsed.set_path("/");
+ Ok(Self { url: parsed })
+ }
+
+ /// An [`ObjectStoreUrl`] for the local filesystem
+ pub fn local_filesystem() -> Self {
+ Self::parse("file://").unwrap()
+ }
+
+ /// Returns this [`ObjectStoreUrl`] as a string
+ pub fn as_str(&self) -> &str {
+ self.as_ref()
+ }
+}
+
+impl AsRef<str> for ObjectStoreUrl {
+ fn as_ref(&self) -> &str {
+ self.url.as_ref()
+ }
+}
+
+impl AsRef<Url> for ObjectStoreUrl {
+ fn as_ref(&self) -> &Url {
+ &self.url
+ }
+}
+
+impl std::fmt::Display for ObjectStoreUrl {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ self.as_str().fmt(f)
+ }
+}
+
+/// Object store registry
+pub struct ObjectStoreRegistry {
+ /// A map from scheme to object store that serve list / read operations
for the store
+ pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl std::fmt::Debug for ObjectStoreRegistry {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ f.debug_struct("ObjectStoreRegistry")
+ .field(
+ "schemes",
+ &self.object_stores.read().keys().collect::<Vec<_>>(),
+ )
+ .finish()
+ }
+}
+
+impl Default for ObjectStoreRegistry {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl ObjectStoreRegistry {
+ /// Create the registry that object stores can registered into.
+ /// ['LocalFileSystem'] store is registered in by default to support read
local files natively.
+ pub fn new() -> Self {
+ let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+ map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+ Self {
+ object_stores: RwLock::new(map),
+ }
+ }
+
+ /// Adds a new store to this registry.
+ /// If a store of the same prefix existed before, it is replaced in the
registry and returned.
+ pub fn register_store(
+ &self,
+ scheme: String,
+ store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>> {
+ let mut stores = self.object_stores.write();
+ stores.insert(scheme, store)
+ }
+
+ /// Get the store registered for scheme
+ pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+ let stores = self.object_stores.read();
+ stores.get(scheme).cloned()
+ }
+
+ /// Get a suitable store for the provided URL. For example:
+ ///
+ /// - URL with scheme `file://` or no schema will return the default
LocalFS store
+ /// - URL with scheme `s3://` will return the S3 store if it's registered
+ ///
+ pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn
ObjectStore>> {
+ let url = url.as_ref();
+ let store = self.get(url.scheme()).ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "No suitable object store found for {}",
+ url
+ ))
+ })?;
+
+ Ok(store)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::datasource::listing::ListingTableUrl;
+ use datafusion_data_access::object_store::local::LocalFileSystem;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_object_store_url() {
+ let listing = ListingTableUrl::parse("file:///").unwrap();
+ let store = listing.object_store();
+ assert_eq!(store.as_str(), "file:///");
+
+ let file = ObjectStoreUrl::parse("file://").unwrap();
+ assert_eq!(file.as_str(), "file:///");
+
+ let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
+ let store = listing.object_store();
+ assert_eq!(store.as_str(), "s3://bucket/");
+
+ let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
+ assert_eq!(url.as_str(), "s3://bucket/");
+
+ let url =
ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
+ assert_eq!(url.as_str(), "s3://username:password@host:123/");
+
+ let err = ObjectStoreUrl::parse("s3://bucket:invalid").unwrap_err();
+ assert_eq!(err.to_string(), "External error: invalid port number");
+
+ let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err();
+ assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only
contain scheme and authority, got: ?");
+
+ let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err();
+ assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only
contain scheme and authority, got: ?foo=bar");
+
+ let err = ObjectStoreUrl::parse("s3://host:123/foo").unwrap_err();
+ assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only
contain scheme and authority, got: /foo");
+
+ let err =
+
ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err();
+ assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only
contain scheme and authority, got: /foo");
+ }
+
+ #[test]
+ fn test_get_by_url_s3() {
+ let sut = ObjectStoreRegistry::default();
+ sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {}));
+ let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
+ sut.get_by_url(&url).unwrap();
+ }
+
+ #[test]
+ fn test_get_by_url_file() {
+ let sut = ObjectStoreRegistry::default();
+ let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
+ sut.get_by_url(&url).unwrap();
+ }
+
+ #[test]
+ fn test_get_by_url_local() {
+ let sut = ObjectStoreRegistry::default();
+ let url = ListingTableUrl::parse("../").unwrap();
+ sut.get_by_url(&url).unwrap();
+ }
+}
diff --git a/datafusion/core/src/datasource/object_store_registry.rs
b/datafusion/core/src/datasource/object_store_registry.rs
deleted file mode 100644
index 70336af02..000000000
--- a/datafusion/core/src/datasource/object_store_registry.rs
+++ /dev/null
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme
for each store.
-//! This allows the user to extend DataFusion with different storage systems
such as S3 or HDFS
-//! and query data inside these systems.
-
-use datafusion_common::{DataFusionError, Result};
-use datafusion_data_access::object_store::local::{LocalFileSystem,
LOCAL_SCHEME};
-use datafusion_data_access::object_store::ObjectStore;
-use parking_lot::RwLock;
-use std::collections::HashMap;
-use std::fmt;
-use std::sync::Arc;
-
-/// Object store registry
-pub struct ObjectStoreRegistry {
- /// A map from scheme to object store that serve list / read operations
for the store
- pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
-}
-
-impl fmt::Debug for ObjectStoreRegistry {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_struct("ObjectStoreRegistry")
- .field(
- "schemes",
- &self.object_stores.read().keys().collect::<Vec<_>>(),
- )
- .finish()
- }
-}
-
-impl Default for ObjectStoreRegistry {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl ObjectStoreRegistry {
- /// Create the registry that object stores can registered into.
- /// ['LocalFileSystem'] store is registered in by default to support read
local files natively.
- pub fn new() -> Self {
- let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
- map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
-
- Self {
- object_stores: RwLock::new(map),
- }
- }
-
- /// Adds a new store to this registry.
- /// If a store of the same prefix existed before, it is replaced in the
registry and returned.
- pub fn register_store(
- &self,
- scheme: String,
- store: Arc<dyn ObjectStore>,
- ) -> Option<Arc<dyn ObjectStore>> {
- let mut stores = self.object_stores.write();
- stores.insert(scheme, store)
- }
-
- /// Get the store registered for scheme
- pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
- let stores = self.object_stores.read();
- stores.get(scheme).cloned()
- }
-
- /// Get a suitable store for the URI based on it's scheme. For example:
- /// - URI with scheme `file://` or no schema will return the default
LocalFS store
- /// - URI with scheme `s3://` will return the S3 store if it's registered
- /// Returns a tuple with the store and the self-described uri of the file
in that store
- pub fn get_by_uri<'a>(
- &self,
- uri: &'a str,
- ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
- if let Some((scheme, path)) = uri.split_once("://") {
- let stores = self.object_stores.read();
- let store = stores
- .get(&*scheme.to_lowercase())
- .map(Clone::clone)
- .ok_or_else(|| {
- DataFusionError::Internal(format!(
- "No suitable object store found for {}",
- scheme
- ))
- })?;
- Ok((store, path))
- } else {
- Ok((Arc::new(LocalFileSystem), uri))
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::ObjectStoreRegistry;
- use datafusion_data_access::object_store::local::LocalFileSystem;
- use std::sync::Arc;
-
- #[test]
- fn test_get_by_uri_s3() {
- let sut = ObjectStoreRegistry::default();
- sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {}));
- let uri = "s3://bucket/key";
- let (_, path) = sut.get_by_uri(uri).unwrap();
- assert_eq!(path, "bucket/key");
- }
-
- #[test]
- fn test_get_by_uri_file() {
- let sut = ObjectStoreRegistry::default();
- let uri = "file:///bucket/key";
- let (_, path) = sut.get_by_uri(uri).unwrap();
- assert_eq!(path, "/bucket/key");
- }
-
- #[test]
- fn test_get_by_uri_local() {
- let sut = ObjectStoreRegistry::default();
- let uri = "/bucket/key";
- let (_, path) = sut.get_by_uri(uri).unwrap();
- assert_eq!(path, "/bucket/key");
- }
-}
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 652834d73..4d579776e 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -57,7 +57,7 @@ use crate::catalog::{
schema::{MemorySchemaProvider, SchemaProvider},
};
use crate::dataframe::DataFrame;
-use crate::datasource::listing::ListingTableConfig;
+use crate::datasource::listing::{ListingTableConfig, ListingTableUrl};
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
@@ -519,26 +519,25 @@ impl SessionContext {
/// Creates a DataFrame for reading an Avro data source.
pub async fn read_avro(
&self,
- uri: impl Into<String>,
+ table_path: impl AsRef<str>,
options: AvroReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
- let uri: String = uri.into();
- let (object_store, path) = self.runtime_env().object_store(&uri)?;
+ let table_path = ListingTableUrl::parse(table_path)?;
+ let object_store = self.runtime_env().object_store(&table_path)?;
let target_partitions = self.copied_config().target_partitions;
let listing_options = options.to_listing_options(target_partitions);
- let path: String = path.into();
-
let resolved_schema = match options.schema {
Some(s) => s,
None => {
listing_options
- .infer_schema(Arc::clone(&object_store), &path)
+ .infer_schema(Arc::clone(&object_store), &table_path)
.await?
}
};
- let config = ListingTableConfig::new(object_store, path.clone())
+
+ let config = ListingTableConfig::new(object_store, table_path)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?;
@@ -548,26 +547,24 @@ impl SessionContext {
/// Creates a DataFrame for reading an Json data source.
pub async fn read_json(
&mut self,
- uri: impl Into<String>,
+ table_path: impl AsRef<str>,
options: NdJsonReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
- let uri: String = uri.into();
- let (object_store, path) = self.runtime_env().object_store(&uri)?;
+ let table_path = ListingTableUrl::parse(table_path)?;
+ let object_store = self.runtime_env().object_store(&table_path)?;
let target_partitions = self.copied_config().target_partitions;
let listing_options = options.to_listing_options(target_partitions);
- let path: String = path.into();
-
let resolved_schema = match options.schema {
Some(s) => s,
None => {
listing_options
- .infer_schema(Arc::clone(&object_store), &path)
+ .infer_schema(Arc::clone(&object_store), &table_path)
.await?
}
};
- let config = ListingTableConfig::new(object_store, path)
+ let config = ListingTableConfig::new(object_store, table_path)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?;
@@ -586,52 +583,47 @@ impl SessionContext {
/// Creates a DataFrame for reading a CSV data source.
pub async fn read_csv(
&self,
- uri: impl Into<String>,
+ table_path: impl AsRef<str>,
options: CsvReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
- let uri: String = uri.into();
- let (object_store, path) = self.runtime_env().object_store(&uri)?;
+ let table_path = ListingTableUrl::parse(table_path)?;
+ let object_store = self.runtime_env().object_store(&table_path)?;
let target_partitions = self.copied_config().target_partitions;
- let path = path.to_string();
let listing_options = options.to_listing_options(target_partitions);
let resolved_schema = match options.schema {
Some(s) => Arc::new(s.to_owned()),
None => {
listing_options
- .infer_schema(Arc::clone(&object_store), &path)
+ .infer_schema(Arc::clone(&object_store), &table_path)
.await?
}
};
- let config = ListingTableConfig::new(object_store, path.clone())
+ let config = ListingTableConfig::new(object_store, table_path.clone())
.with_listing_options(listing_options)
.with_schema(resolved_schema);
- let provider = ListingTable::try_new(config)?;
- let plan =
- LogicalPlanBuilder::scan(path,
provider_as_source(Arc::new(provider)), None)?
- .build()?;
- Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ let provider = ListingTable::try_new(config)?;
+ self.read_table(Arc::new(provider))
}
/// Creates a DataFrame for reading a Parquet data source.
pub async fn read_parquet(
&self,
- uri: impl Into<String>,
+ table_path: impl AsRef<str>,
options: ParquetReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
- let uri: String = uri.into();
- let (object_store, path) = self.runtime_env().object_store(&uri)?;
+ let table_path = ListingTableUrl::parse(table_path)?;
+ let object_store = self.runtime_env().object_store(&table_path)?;
let target_partitions = self.copied_config().target_partitions;
let listing_options = options.to_listing_options(target_partitions);
- let path: String = path.into();
// with parquet we resolve the schema in all cases
let resolved_schema = listing_options
- .infer_schema(Arc::clone(&object_store), &path)
+ .infer_schema(Arc::clone(&object_store), &table_path)
.await?;
- let config = ListingTableConfig::new(object_store, path)
+ let config = ListingTableConfig::new(object_store, table_path)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
@@ -651,23 +643,24 @@ impl SessionContext {
/// Registers a table that uses the listing feature of the object store to
/// find the files to be processed
/// This is async because it might need to resolve the schema.
- pub async fn register_listing_table<'a>(
- &'a self,
- name: &'a str,
- uri: &'a str,
+ pub async fn register_listing_table(
+ &self,
+ name: &str,
+ table_path: impl AsRef<str>,
options: ListingOptions,
provided_schema: Option<SchemaRef>,
) -> Result<()> {
- let (object_store, path) = self.runtime_env().object_store(uri)?;
+ let table_path = ListingTableUrl::parse(table_path)?;
+ let object_store = self.runtime_env().object_store(&table_path)?;
let resolved_schema = match provided_schema {
None => {
options
- .infer_schema(Arc::clone(&object_store), path)
+ .infer_schema(Arc::clone(&object_store), &table_path)
.await?
}
Some(s) => s,
};
- let config = ListingTableConfig::new(object_store, path)
+ let config = ListingTableConfig::new(object_store, table_path)
.with_listing_options(options)
.with_schema(resolved_schema);
let table = ListingTable::try_new(config)?;
@@ -680,7 +673,7 @@ impl SessionContext {
pub async fn register_csv(
&self,
name: &str,
- uri: &str,
+ table_path: &str,
options: CsvReadOptions<'_>,
) -> Result<()> {
let listing_options =
@@ -688,7 +681,7 @@ impl SessionContext {
self.register_listing_table(
name,
- uri,
+ table_path,
listing_options,
options.schema.map(|s| Arc::new(s.to_owned())),
)
@@ -702,13 +695,13 @@ impl SessionContext {
pub async fn register_json(
&self,
name: &str,
- uri: &str,
+ table_path: &str,
options: NdJsonReadOptions<'_>,
) -> Result<()> {
let listing_options =
options.to_listing_options(self.copied_config().target_partitions);
- self.register_listing_table(name, uri, listing_options, options.schema)
+ self.register_listing_table(name, table_path, listing_options,
options.schema)
.await?;
Ok(())
}
@@ -718,7 +711,7 @@ impl SessionContext {
pub async fn register_parquet(
&self,
name: &str,
- uri: &str,
+ table_path: &str,
options: ParquetReadOptions<'_>,
) -> Result<()> {
let (target_partitions, parquet_pruning) = {
@@ -729,7 +722,7 @@ impl SessionContext {
.parquet_pruning(parquet_pruning)
.to_listing_options(target_partitions);
- self.register_listing_table(name, uri, listing_options, None)
+ self.register_listing_table(name, table_path, listing_options, None)
.await?;
Ok(())
}
@@ -739,13 +732,13 @@ impl SessionContext {
pub async fn register_avro(
&self,
name: &str,
- uri: &str,
+ table_path: &str,
options: AvroReadOptions<'_>,
) -> Result<()> {
let listing_options =
options.to_listing_options(self.copied_config().target_partitions);
- self.register_listing_table(name, uri, listing_options, options.schema)
+ self.register_listing_table(name, table_path, listing_options,
options.schema)
.await?;
Ok(())
}
diff --git a/datafusion/core/src/execution/runtime_env.rs
b/datafusion/core/src/execution/runtime_env.rs
index 73bbc836e..26d1471a1 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -26,12 +26,13 @@ use crate::{
},
};
-use crate::datasource::object_store_registry::ObjectStoreRegistry;
+use crate::datasource::object_store::ObjectStoreRegistry;
use datafusion_common::DataFusionError;
use datafusion_data_access::object_store::ObjectStore;
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::sync::Arc;
+use url::Url;
#[derive(Clone)]
/// Execution runtime environment.
@@ -100,12 +101,9 @@ impl RuntimeEnv {
}
/// Retrieves a `ObjectStore` instance by scheme
- pub fn object_store<'a>(
- &self,
- uri: &'a str,
- ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
+ pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn
ObjectStore>> {
self.object_store_registry
- .get_by_uri(uri)
+ .get_by_url(url)
.map_err(DataFusionError::from)
}
}
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 600e24fb8..e8852fdc1 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -52,11 +52,11 @@
//! .to_string();
//!
//! let expected = vec![
-//! "+---+--------------------------+",
-//! "| a | MIN(tests/example.csv.b) |",
-//! "+---+--------------------------+",
-//! "| 1 | 2 |",
-//! "+---+--------------------------+"
+//! "+---+----------------+",
+//! "| a | MIN(?table?.b) |",
+//! "+---+----------------+",
+//! "| 1 | 2 |",
+//! "+---+----------------+"
//! ];
//!
//! assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs
b/datafusion/core/src/physical_optimizer/repartition.rs
index 3b2c4515e..f24e1e4e6 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -241,6 +241,7 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
+ use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::expressions::{col, PhysicalSortExpr};
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
@@ -261,6 +262,7 @@ mod tests {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store: TestObjectStore::new_arc(&[("x", 100)]),
+ object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
statistics: Statistics::default(),
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs
b/datafusion/core/src/physical_plan/file_format/avro.rs
index fc56ce1d8..8f4d30be0 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -167,6 +167,7 @@ impl ExecutionPlan for AvroExec {
mod tests {
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
+ use crate::datasource::object_store::ObjectStoreUrl;
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use arrow::datatypes::{DataType, Field, Schema};
@@ -188,6 +189,7 @@ mod tests {
let avro_exec = AvroExec::new(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
+ object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![meta.into()]],
file_schema,
statistics: Statistics::default(),
@@ -241,9 +243,12 @@ mod tests {
async fn avro_exec_missing_column() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
- let store = Arc::new(LocalFileSystem {}) as _;
+ let object_store = Arc::new(LocalFileSystem {}) as _;
+ let object_store_url = ObjectStoreUrl::local_filesystem();
let meta = local_unpartitioned_file(filename);
- let actual_schema = AvroFormat {}.infer_schema(&store,
&[meta.clone()]).await?;
+ let actual_schema = AvroFormat {}
+ .infer_schema(&object_store, &[meta.clone()])
+ .await?;
let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -253,7 +258,8 @@ mod tests {
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
let avro_exec = AvroExec::new(FileScanConfig {
- object_store: store,
+ object_store,
+ object_store_url,
file_groups: vec![vec![meta.into()]],
file_schema,
statistics: Statistics::default(),
@@ -307,9 +313,12 @@ mod tests {
async fn avro_exec_with_partition() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
- let store = Arc::new(LocalFileSystem {}) as _;
+ let object_store = Arc::new(LocalFileSystem {}) as _;
+ let object_store_url = ObjectStoreUrl::local_filesystem();
let meta = local_unpartitioned_file(filename);
- let file_schema = AvroFormat {}.infer_schema(&store,
&[meta.clone()]).await?;
+ let file_schema = AvroFormat {}
+ .infer_schema(&object_store, &[meta.clone()])
+ .await?;
let mut partitioned_file = PartitionedFile::from(meta);
partitioned_file.partition_values =
@@ -319,7 +328,8 @@ mod tests {
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table
schema.
projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
- object_store: store,
+ object_store,
+ object_store_url,
file_groups: vec![vec![partitioned_file]],
file_schema,
statistics: Statistics::default(),
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs
b/datafusion/core/src/physical_plan/file_format/json.rs
index 5470f6d57..3a179a7a2 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -195,6 +195,7 @@ mod tests {
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
+ use crate::datasource::object_store::ObjectStoreUrl;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
use datafusion_data_access::object_store::local::local_unpartitioned_file;
@@ -205,9 +206,14 @@ mod tests {
const TEST_DATA_BASE: &str = "tests/jsons";
- async fn prepare_store(
- ) -> (Arc<dyn ObjectStore>, Vec<Vec<PartitionedFile>>, SchemaRef) {
+ async fn prepare_store() -> (
+ Arc<dyn ObjectStore>,
+ ObjectStoreUrl,
+ Vec<Vec<PartitionedFile>>,
+ SchemaRef,
+ ) {
let store = Arc::new(LocalFileSystem {}) as _;
+ let store_url = ObjectStoreUrl::local_filesystem();
let path = format!("{}/1.json", TEST_DATA_BASE);
let meta = local_unpartitioned_file(path);
let schema = JsonFormat::default()
@@ -215,7 +221,7 @@ mod tests {
.await
.unwrap();
- (store, vec![vec![meta.into()]], schema)
+ (store, store_url, vec![vec![meta.into()]], schema)
}
#[tokio::test]
@@ -223,10 +229,13 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
use arrow::datatypes::DataType;
- let (object_store, file_groups, file_schema) = prepare_store().await;
+
+ let (object_store, object_store_url, file_groups, file_schema) =
+ prepare_store().await;
let exec = NdJsonExec::new(FileScanConfig {
object_store,
+ object_store_url,
file_groups,
file_schema,
statistics: Statistics::default(),
@@ -280,7 +289,8 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
use arrow::datatypes::DataType;
- let (object_store, file_groups, actual_schema) = prepare_store().await;
+ let (object_store, object_store_url, file_groups, actual_schema) =
+ prepare_store().await;
let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -290,6 +300,7 @@ mod tests {
let exec = NdJsonExec::new(FileScanConfig {
object_store,
+ object_store_url,
file_groups,
file_schema,
statistics: Statistics::default(),
@@ -319,10 +330,12 @@ mod tests {
async fn nd_json_exec_file_projection() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
- let (object_store, file_groups, file_schema) = prepare_store().await;
+ let (object_store, object_store_url, file_groups, file_schema) =
+ prepare_store().await;
let exec = NdJsonExec::new(FileScanConfig {
object_store,
+ object_store_url,
file_groups,
file_schema,
statistics: Statistics::default(),
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 566b4c8d4..0fed497d3 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -38,7 +38,7 @@ pub use csv::CsvExec;
pub(crate) use json::plan_to_json;
pub use json::NdJsonExec;
-use crate::datasource::listing::PartitionedFile;
+use crate::datasource::{listing::PartitionedFile,
object_store::ObjectStoreUrl};
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
@@ -68,6 +68,8 @@ lazy_static! {
pub struct FileScanConfig {
/// Store from which the `files` should be fetched
pub object_store: Arc<dyn ObjectStore>,
+ /// Object store URL
+ pub object_store_url: ObjectStoreUrl,
/// Schema before projection. It contains the columns that are expected
/// to be in the files without the table partition columns.
pub file_schema: SchemaRef,
@@ -658,6 +660,7 @@ mod tests {
file_groups: vec![vec![]],
limit: None,
object_store: TestObjectStore::new_arc(&[]),
+ object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
projection,
statistics,
table_partition_cols,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 0931484ef..9eda036a4 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -640,6 +640,7 @@ mod tests {
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::test_util::scan_format;
use crate::datasource::listing::FileRange;
+ use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::options::CsvReadOptions;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use arrow::array::Float32Array;
@@ -681,6 +682,7 @@ mod tests {
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
+ object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![file_groups],
file_schema,
statistics: Statistics::default(),
@@ -1067,6 +1069,7 @@ mod tests {
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
+ object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups,
file_schema,
statistics: Statistics::default(),
@@ -1155,6 +1158,7 @@ mod tests {
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store: store,
+ object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![partitioned_file]],
file_schema: schema,
statistics: Statistics::default(),
@@ -1214,6 +1218,7 @@ mod tests {
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
+ object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![partitioned_file]],
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index feb0bf322..2a89ea0df 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -288,6 +288,7 @@ pub fn with_new_children_if_necessary(
/// ```
/// use datafusion::prelude::*;
/// use datafusion::physical_plan::displayable;
+/// use std::path::is_separator;
///
/// #[tokio::main]
/// async fn main() {
@@ -310,11 +311,15 @@ pub fn with_new_children_if_necessary(
/// let displayable_plan = displayable(physical_plan.as_ref());
/// let plan_string = format!("{}", displayable_plan.indent());
///
+/// let working_directory = std::env::current_dir().unwrap();
+/// let normalized =
working_directory.to_string_lossy().replace(is_separator, "/");
+/// let plan_string = plan_string.replace(&normalized, "WORKING_DIR");
+///
/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
/// \n CoalesceBatchesExec: target_batch_size=4096\
/// \n FilterExec: a@0 < 5\
/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\
-/// \n CsvExec: files=[tests/example.csv],
has_header=true, limit=None, projection=[a]",
+/// \n CsvExec: files=[WORKING_DIR/tests/example.csv],
has_header=true, limit=None, projection=[a]",
/// plan_string.trim());
///
/// let one_line = format!("{}", displayable_plan.one_line());
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 815379de4..19a9db9a4 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -18,6 +18,7 @@
//! Common unit test utility methods
use crate::arrow::array::UInt32Array;
+use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::{MemTable, TableProvider};
use crate::error::Result;
use crate::from_slice::FromSlice;
@@ -120,6 +121,7 @@ pub fn partitioned_csv_config(
Ok(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
+ object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema,
file_groups,
statistics: Default::default(),
diff --git a/datafusion/core/src/test/object_store.rs
b/datafusion/core/src/test/object_store.rs
index cc9c02305..95a6f16ed 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -48,7 +48,7 @@ impl TestObjectStore {
#[async_trait]
impl ObjectStore for TestObjectStore {
async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
- let prefix = prefix.to_owned();
+ let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string();
Ok(Box::pin(
stream::iter(
self.files
diff --git a/datafusion/core/tests/path_partition.rs
b/datafusion/core/tests/path_partition.rs
index 873554747..a7e6ea452 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -20,6 +20,7 @@
use std::{fs, io, sync::Arc};
use async_trait::async_trait;
+use datafusion::datasource::listing::ListingTableUrl;
use datafusion::{
assert_batches_sorted_eq,
datafusion_data_access::{
@@ -182,7 +183,7 @@ async fn csv_filter_with_file_col() -> Result<()> {
"mytable/date=2021-10-28/file.csv",
],
&["date"],
- "mytable",
+ "file:///mytable",
);
let result = ctx
@@ -218,7 +219,7 @@ async fn csv_projection_on_partition() -> Result<()> {
"mytable/date=2021-10-28/file.csv",
],
&["date"],
- "mytable",
+ "file:///mytable",
);
let result = ctx
@@ -255,7 +256,7 @@ async fn csv_grouping_by_partition() -> Result<()> {
"mytable/date=2021-10-28/file.csv",
],
&["date"],
- "mytable",
+ "file:///mytable",
);
let result = ctx
@@ -419,6 +420,7 @@ fn register_partitioned_aggregate_csv(
let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
options.table_partition_cols = partition_cols.iter().map(|&s|
s.to_owned()).collect();
+ let table_path = ListingTableUrl::parse(table_path).unwrap();
let config = ListingTableConfig::new(object_store, table_path)
.with_listing_options(options)
.with_schema(file_schema);
@@ -444,8 +446,12 @@ async fn register_partitioned_alltypes_parquet(
options.table_partition_cols = partition_cols.iter().map(|&s|
s.to_owned()).collect();
options.collect_stat = true;
+ let table_path = ListingTableUrl::parse(format!("mirror:///{}",
table_path)).unwrap();
+ let store_path =
+ ListingTableUrl::parse(format!("mirror:///{}",
store_paths[0])).unwrap();
+
let file_schema = options
- .infer_schema(Arc::clone(&object_store), store_paths[0])
+ .infer_schema(Arc::clone(&object_store), &store_path)
.await
.expect("Parquet schema inference failed");
@@ -487,7 +493,7 @@ impl ObjectStore for MirroringObjectStore {
&self,
prefix: &str,
) -> datafusion_data_access::Result<FileMetaStream> {
- let prefix = prefix.to_owned();
+ let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string();
let size = self.file_size;
Ok(Box::pin(
stream::iter(
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 51ddff2fb..947ebc699 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -17,6 +17,7 @@
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
+use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::error::Result;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{collect, ExecutionPlan};
@@ -81,21 +82,23 @@ async fn get_exec(
let meta = local_unpartitioned_file(filename);
let format = ParquetFormat::default();
- let store = Arc::new(LocalFileSystem {}) as _;
+ let object_store = Arc::new(LocalFileSystem {}) as _;
+ let object_store_url = ObjectStoreUrl::local_filesystem();
let file_schema = format
- .infer_schema(&store, &[meta.clone()])
+ .infer_schema(&object_store, &[meta.clone()])
.await
.expect("Schema inference");
let statistics = format
- .infer_stats(&store, file_schema.clone(), &meta)
+ .infer_stats(&object_store, file_schema.clone(), &meta)
.await
.expect("Stats inference");
let file_groups = vec![vec![meta.into()]];
let exec = format
.create_physical_plan(
FileScanConfig {
- object_store: store,
+ object_store,
+ object_store_url,
file_schema,
file_groups,
statistics,
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index f72e0f8f8..759edf6fc 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -642,7 +642,7 @@ order by
#[tokio::test]
async fn test_physical_plan_display_indent() {
// Hard code target_partitions as it appears in the RepartitionExec output
- let config = SessionConfig::new().with_target_partitions(3);
+ let config = SessionConfig::new().with_target_partitions(9000);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await.unwrap();
let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \
@@ -662,22 +662,21 @@ async fn test_physical_plan_display_indent() {
" ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1
as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 3)",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 9000)",
" AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < CAST(10 AS Float64)",
- " RepartitionExec: partitioning=RoundRobinBatch(3)",
+ " RepartitionExec:
partitioning=RoundRobinBatch(9000)",
" CsvExec:
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true,
limit=None, projection=[c1, c12]",
];
- let data_path = datafusion::test_util::arrow_test_data();
+ let normalizer = ExplainNormalizer::new();
let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
.trim()
.lines()
// normalize paths
- .map(|s| s.replace(&data_path, "ARROW_TEST_DATA"))
+ .map(|s| normalizer.normalize(s))
.collect::<Vec<_>>();
-
assert_eq!(
expected, actual,
"expected:\n{:#?}\nactual:\n\n{:#?}\n",
@@ -688,7 +687,7 @@ async fn test_physical_plan_display_indent() {
#[tokio::test]
async fn test_physical_plan_display_indent_multi_children() {
// Hard code target_partitions as it appears in the RepartitionExec output
- let config = SessionConfig::new().with_target_partitions(3);
+ let config = SessionConfig::new().with_target_partitions(9000);
let ctx = SessionContext::with_config(config);
// ensure indenting works for nodes with multiple children
register_aggregate_csv(&ctx).await.unwrap();
@@ -708,25 +707,25 @@ async fn
test_physical_plan_display_indent_multi_children() {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column {
name: \"c1\", index: 0 }, Column { name: \"c2\", index: 0 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\",
index: 0 }], 3)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c1\",
index: 0 }], 9000)",
" ProjectionExec: expr=[c1@0 as c1]",
" ProjectionExec: expr=[c1@0 as c1]",
- " RepartitionExec: partitioning=RoundRobinBatch(3)",
+ " RepartitionExec: partitioning=RoundRobinBatch(9000)",
" CsvExec:
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true,
limit=None, projection=[c1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c2\",
index: 0 }], 3)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c2\",
index: 0 }], 9000)",
" ProjectionExec: expr=[c2@0 as c2]",
" ProjectionExec: expr=[c1@0 as c2]",
- " RepartitionExec: partitioning=RoundRobinBatch(3)",
+ " RepartitionExec: partitioning=RoundRobinBatch(9000)",
" CsvExec:
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true,
limit=None, projection=[c1]",
];
- let data_path = datafusion::test_util::arrow_test_data();
+ let normalizer = ExplainNormalizer::new();
let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
.trim()
.lines()
// normalize paths
- .map(|s| s.replace(&data_path, "ARROW_TEST_DATA"))
+ .map(|s| normalizer.normalize(s))
.collect::<Vec<_>>();
assert_eq!(
diff --git a/datafusion/core/tests/sql/json.rs
b/datafusion/core/tests/sql/json.rs
index 79deaae79..a74076415 100644
--- a/datafusion/core/tests/sql/json.rs
+++ b/datafusion/core/tests/sql/json.rs
@@ -96,7 +96,7 @@ async fn json_explain() {
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
- \n JsonExec: limit=None, files=[tests/jsons/2.json]\n",
+ \n JsonExec: limit=None,
files=[WORKING_DIR/tests/jsons/2.json]\n",
],
];
assert_eq!(expected, actual);
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index a13f2ebff..3e19dbcb9 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -48,7 +48,9 @@ use datafusion::{execution::context::SessionContext,
physical_plan::displayable}
use datafusion_expr::Volatility;
use std::fs::File;
use std::io::Write;
+use std::path::PathBuf;
use tempfile::TempDir;
+use url::Url;
/// A macro to assert that some particular line contains two substrings
///
@@ -811,25 +813,57 @@ pub fn table_with_sequence(
Ok(Arc::new(MemTable::try_new(schema, partitions)?))
}
-// Normalizes parts of an explain plan that vary from run to run (such as path)
-fn normalize_for_explain(s: &str) -> String {
- // Convert things like
/Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv
- // to ARROW_TEST_DATA/csv/aggregate_test_100.csv
- let data_path = datafusion::test_util::arrow_test_data();
- let s = s.replace(&data_path, "ARROW_TEST_DATA");
+pub struct ExplainNormalizer {
+ replacements: Vec<(String, String)>,
+}
+
+impl ExplainNormalizer {
+ fn new() -> Self {
+ let mut replacements = vec![];
+
+ let mut push_path = |path: PathBuf, key: &str| {
+ // Push path as is
+ replacements.push((path.to_string_lossy().to_string(),
key.to_string()));
+
+ // Push canonical version of path
+ let canonical = path.canonicalize().unwrap();
+ replacements.push((canonical.to_string_lossy().to_string(),
key.to_string()));
+
+ if cfg!(target_family = "windows") {
+ // Push URL representation of path, to handle windows
+ let url = Url::from_file_path(canonical).unwrap();
+ let path = url.path().strip_prefix('/').unwrap();
+ replacements.push((path.to_string(), key.to_string()));
+ }
+ };
+
+ push_path(test_util::arrow_test_data().into(), "ARROW_TEST_DATA");
+ push_path(std::env::current_dir().unwrap(), "WORKING_DIR");
- // convert things like partitioning=RoundRobinBatch(16)
- // to partitioning=RoundRobinBatch(NUM_CORES)
- let needle = format!("RoundRobinBatch({})", num_cpus::get());
- s.replace(&needle, "RoundRobinBatch(NUM_CORES)")
+ // convert things like partitioning=RoundRobinBatch(16)
+ // to partitioning=RoundRobinBatch(NUM_CORES)
+ let needle = format!("RoundRobinBatch({})", num_cpus::get());
+ replacements.push((needle, "RoundRobinBatch(NUM_CORES)".to_string()));
+
+ Self { replacements }
+ }
+
+ fn normalize(&self, s: impl Into<String>) -> String {
+ let mut s = s.into();
+ for (from, to) in &self.replacements {
+ s = s.replace(from, to);
+ }
+ s
+ }
}
/// Applies normalize_for_explain to every line
fn normalize_vec_for_explain(v: Vec<Vec<String>>) -> Vec<Vec<String>> {
+ let normalizer = ExplainNormalizer::new();
v.into_iter()
.map(|l| {
l.into_iter()
- .map(|s| normalize_for_explain(&s))
+ .map(|s| normalizer.normalize(s))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
@@ -948,7 +982,7 @@ async fn nyc() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"tripdata",
- "file.csv",
+ "file:///file.csv",
CsvReadOptions::new().schema(&schema),
)
.await?;
diff --git a/datafusion/data-access/Cargo.toml
b/datafusion/data-access/Cargo.toml
index 7bf447f04..8a556f1f3 100644
--- a/datafusion/data-access/Cargo.toml
+++ b/datafusion/data-access/Cargo.toml
@@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion"
readme = "README.md"
authors = ["Apache Arrow <[email protected]>"]
license = "Apache-2.0"
-keywords = [ "arrow", "query", "sql" ]
+keywords = ["arrow", "query", "sql"]
edition = "2021"
rust-version = "1.59"
@@ -36,7 +36,6 @@ path = "src/lib.rs"
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false, features = ["std"] }
futures = "0.3"
-glob = "0.3.0"
parking_lot = "0.12"
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync", "fs", "parking_lot"] }
diff --git a/datafusion/data-access/src/object_store/local.rs
b/datafusion/data-access/src/object_store/local.rs
index 604539814..7b0cab7cd 100644
--- a/datafusion/data-access/src/object_store/local.rs
+++ b/datafusion/data-access/src/object_store/local.rs
@@ -317,30 +317,4 @@ mod tests {
Ok(())
}
-
- #[tokio::test]
- async fn test_globbing() -> Result<()> {
- let tmp = tempdir()?;
- let a1_path = tmp.path().join("a1.txt");
- let a2_path = tmp.path().join("a2.txt");
- let b1_path = tmp.path().join("b1.txt");
- File::create(&a1_path)?;
- File::create(&a2_path)?;
- File::create(&b1_path)?;
-
- let glob = format!("{}/a*.txt", tmp.path().to_str().unwrap());
- let mut all_files = HashSet::new();
- let mut files = LocalFileSystem.glob_file(&glob).await?;
- while let Some(file) = files.next().await {
- let file = file?;
- assert_eq!(file.size(), 0);
- all_files.insert(file.path().to_owned());
- }
-
- assert_eq!(all_files.len(), 2);
- assert!(all_files.contains(a1_path.to_str().unwrap()));
- assert!(all_files.contains(a2_path.to_str().unwrap()));
-
- Ok(())
- }
}
diff --git a/datafusion/data-access/src/object_store/mod.rs
b/datafusion/data-access/src/object_store/mod.rs
index 93a930a6d..496a5494f 100644
--- a/datafusion/data-access/src/object_store/mod.rs
+++ b/datafusion/data-access/src/object_store/mod.rs
@@ -21,15 +21,11 @@ pub mod local;
use std::fmt::Debug;
use std::io::Read;
-use std::path;
-use std::path::{Component, Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
-use futures::future::ready;
-use futures::{AsyncRead, Stream, StreamExt, TryStreamExt};
-use glob::Pattern;
+use futures::{AsyncRead, Stream};
use crate::{FileMeta, ListEntry, Result, SizedFile};
@@ -78,54 +74,6 @@ pub trait ObjectStore: Sync + Send + Debug {
/// Returns all the files in path `prefix`
async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>;
- /// Calls `list_file` with a suffix filter
- async fn list_file_with_suffix(
- &self,
- prefix: &str,
- suffix: &str,
- ) -> Result<FileMetaStream> {
- self.glob_file_with_suffix(prefix, suffix).await
- }
-
- /// Returns all the files matching `glob_pattern`
- async fn glob_file(&self, glob_pattern: &str) -> Result<FileMetaStream> {
- if !contains_glob_start_char(glob_pattern) {
- self.list_file(glob_pattern).await
- } else {
- let normalized_glob_pb = normalize_path(Path::new(glob_pattern));
- let normalized_glob_pattern =
- normalized_glob_pb.as_os_str().to_str().unwrap();
- let start_path =
-
find_longest_search_path_without_glob_pattern(normalized_glob_pattern);
- let file_stream = self.list_file(&start_path).await?;
- let pattern = Pattern::new(normalized_glob_pattern).unwrap();
- Ok(Box::pin(file_stream.filter(move |fr| {
- let matches_pattern = match fr {
- Ok(f) => pattern.matches(f.path()),
- Err(_) => true,
- };
- async move { matches_pattern }
- })))
- }
- }
-
- /// Calls `glob_file` with a suffix filter
- async fn glob_file_with_suffix(
- &self,
- glob_pattern: &str,
- suffix: &str,
- ) -> Result<FileMetaStream> {
- let files_to_consider = match contains_glob_start_char(glob_pattern) {
- true => self.glob_file(glob_pattern).await,
- false => self.list_file(glob_pattern).await,
- }?;
-
- match suffix.is_empty() {
- true => Ok(files_to_consider),
- false => filter_suffix(files_to_consider, suffix),
- }
- }
-
/// Returns all the files in `prefix` if the `prefix` is already a leaf
dir,
/// or all paths between the `prefix` and the first occurrence of the
`delimiter` if it is provided.
async fn list_dir(
@@ -137,144 +85,3 @@ pub trait ObjectStore: Sync + Send + Debug {
/// Get object reader for one file
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
}
-
-/// Normalize a path without requiring it to exist on the filesystem
(path::canonicalize)
-pub fn normalize_path<P: AsRef<Path>>(path: P) -> PathBuf {
- let ends_with_slash = path
- .as_ref()
- .to_str()
- .map_or(false, |s| s.ends_with(path::MAIN_SEPARATOR));
- let mut normalized = PathBuf::new();
- for component in path.as_ref().components() {
- match &component {
- Component::ParentDir => {
- if !normalized.pop() {
- normalized.push(component);
- }
- }
- _ => {
- normalized.push(component);
- }
- }
- }
- if ends_with_slash {
- normalized.push("");
- }
- normalized
-}
-
-const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
-
-/// Determine whether the path contains a globbing character
-fn contains_glob_start_char(path: &str) -> bool {
- path.chars().any(|c| GLOB_START_CHARS.contains(&c))
-}
-
-/// Filters the file_stream to only contain files that end with suffix
-fn filter_suffix(file_stream: FileMetaStream, suffix: &str) ->
Result<FileMetaStream> {
- let suffix = suffix.to_owned();
- Ok(Box::pin(
- file_stream.try_filter(move |f| ready(f.path().ends_with(&suffix))),
- ))
-}
-
-fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String
{
- // in case the glob_pattern is not actually a glob pattern, take the
entire thing
- if !contains_glob_start_char(glob_pattern) {
- glob_pattern.to_string()
- } else {
- // take all the components of the path (left-to-right) which do not
contain a glob pattern
- let components_in_glob_pattern = Path::new(glob_pattern).components();
- let mut path_buf_for_longest_search_path_without_glob_pattern =
PathBuf::new();
- for component_in_glob_pattern in components_in_glob_pattern {
- let component_as_str =
- component_in_glob_pattern.as_os_str().to_str().unwrap();
- if contains_glob_start_char(component_as_str) {
- break;
- }
- path_buf_for_longest_search_path_without_glob_pattern
- .push(component_in_glob_pattern);
- }
-
- let mut result = path_buf_for_longest_search_path_without_glob_pattern
- .to_str()
- .unwrap()
- .to_string();
-
- // when we're not at the root, append a separator
- if path_buf_for_longest_search_path_without_glob_pattern
- .components()
- .count()
- > 1
- {
- result.push(path::MAIN_SEPARATOR);
- }
- result
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[tokio::test]
- async fn test_is_glob_path() -> Result<()> {
- assert!(!contains_glob_start_char("/"));
- assert!(!contains_glob_start_char("/test"));
- assert!(!contains_glob_start_char("/test/"));
- assert!(contains_glob_start_char("/test*"));
- Ok(())
- }
-
- fn test_longest_base_path(input: &str, expected: &str) {
- assert_eq!(
- find_longest_search_path_without_glob_pattern(input),
- expected,
- "testing find_longest_search_path_without_glob_pattern with {}",
- input
- );
- }
-
- #[tokio::test]
- async fn test_find_longest_search_path_without_glob_pattern() ->
Result<()> {
- // no glob patterns, thus we get the full path (as-is)
- test_longest_base_path("/", "/");
- test_longest_base_path("/a.txt", "/a.txt");
- test_longest_base_path("/a", "/a");
- test_longest_base_path("/a/", "/a/");
- test_longest_base_path("/a/b", "/a/b");
- test_longest_base_path("/a/b/", "/a/b/");
- test_longest_base_path("/a/b.txt", "/a/b.txt");
- test_longest_base_path("/a/b/c.txt", "/a/b/c.txt");
- // glob patterns, thus we build the longest path (os-specific)
- use path::MAIN_SEPARATOR;
- test_longest_base_path("/*.txt", &format!("{MAIN_SEPARATOR}"));
- test_longest_base_path(
- "/a/*b.txt",
- &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
- );
- test_longest_base_path(
- "/a/*/b.txt",
- &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
- );
- test_longest_base_path(
- "/a/b/[123]/file*.txt",
- &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"),
- );
- test_longest_base_path(
- "/a/b*.txt",
- &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
- );
- test_longest_base_path(
- "/a/b/**/c*.txt",
- &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"),
- );
- test_longest_base_path(
- &format!("{}/alltypes_plain*.parquet", "/a/b/c//"), //
https://github.com/apache/arrow-datafusion/issues/2465
- &format!(
-
"{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}c{MAIN_SEPARATOR}"
- ),
- );
- Ok(())
- }
-}
diff --git a/datafusion/proto/src/logical_plan.rs
b/datafusion/proto/src/logical_plan.rs
index bf557be88..4eff00e01 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -30,7 +30,7 @@ use datafusion::{
file_format::{
avro::AvroFormat, csv::CsvFormat, parquet::ParquetFormat,
FileFormat,
},
- listing::{ListingOptions, ListingTable, ListingTableConfig},
+ listing::{ListingOptions, ListingTable, ListingTableConfig,
ListingTableUrl},
},
logical_plan::{provider_as_source, source_as_provider},
};
@@ -411,6 +411,7 @@ impl AsLogicalPlan for LogicalPlanNode {
FileFormatType::Avro(..) =>
Arc::new(AvroFormat::default()),
};
+ let table_path = ListingTableUrl::parse(&scan.path)?;
let options = ListingOptions {
file_extension: scan.file_extension.clone(),
format: file_format,
@@ -419,7 +420,7 @@ impl AsLogicalPlan for LogicalPlanNode {
target_partitions: scan.target_partitions as usize,
};
- let object_store =
ctx.runtime_env().object_store(scan.path.as_str())?.0;
+ let object_store =
ctx.runtime_env().object_store(&table_path)?;
println!(
"Found object store {:?} for path {}",
@@ -427,7 +428,7 @@ impl AsLogicalPlan for LogicalPlanNode {
scan.path.as_str()
);
- let config = ListingTableConfig::new(object_store,
scan.path.as_str())
+ let config = ListingTableConfig::new(object_store, table_path)
.with_listing_options(options)
.with_schema(Arc::new(schema));
@@ -763,7 +764,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.options()
.table_partition_cols
.clone(),
- path: listing_table.table_path().to_owned(),
+ path: listing_table.table_path().to_string(),
schema: Some(schema),
projection,
filters,
diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh
index 0c3922627..8c2e687f3 100755
--- a/dev/build-arrow-ballista.sh
+++ b/dev/build-arrow-ballista.sh
@@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null
# clone the repo
# TODO make repo/branch configurable
-git clone https://github.com/apache/arrow-ballista
+git clone https://github.com/tustvold/arrow-ballista -b url-refactor
# update dependencies to local crates
python ./dev/make-ballista-deps-local.py