This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new fdb8fecf0 Extract Listing URI logic into ListingTableUri structure 
(#2578)
fdb8fecf0 is described below

commit fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon May 30 16:53:42 2022 +0100

    Extract Listing URI logic into ListingTableUri structure (#2578)
    
    * Extract Listing URI logic into ListingTableUri structure
    
    * Fix explain test CPU count sensitivity
    
    * Windows compatibility
    
    * More windows pacification
    
    * Even more windows pacification
    
    * Fix windows doctest
    
    * Add ObjectStoreUrl
    
    * Update ballista pin
    
    * Format
    
    * Clippy
    
    * Consistent naming
    
    * Review feedback
    
    * Fix windows test
---
 benchmarks/src/bin/tpch.rs                         |   4 +-
 datafusion-examples/examples/flight_server.rs      |   6 +-
 datafusion/core/Cargo.toml                         |   3 +
 datafusion/core/benches/sort_limit_query_sql.rs    |   7 +-
 datafusion/core/src/catalog/schema.rs              |  36 ++-
 datafusion/core/src/datasource/file_format/mod.rs  |   2 +
 datafusion/core/src/datasource/listing/helpers.rs  | 202 ++++++--------
 datafusion/core/src/datasource/listing/mod.rs      |   2 +
 datafusion/core/src/datasource/listing/table.rs    |  63 +++--
 datafusion/core/src/datasource/listing/url.rs      | 307 +++++++++++++++++++++
 datafusion/core/src/datasource/mod.rs              |   2 +-
 datafusion/core/src/datasource/object_store.rs     | 218 +++++++++++++++
 .../core/src/datasource/object_store_registry.rs   | 138 ---------
 datafusion/core/src/execution/context.rs           |  87 +++---
 datafusion/core/src/execution/runtime_env.rs       |  10 +-
 datafusion/core/src/lib.rs                         |  10 +-
 .../core/src/physical_optimizer/repartition.rs     |   2 +
 .../core/src/physical_plan/file_format/avro.rs     |  22 +-
 .../core/src/physical_plan/file_format/json.rs     |  25 +-
 .../core/src/physical_plan/file_format/mod.rs      |   5 +-
 .../core/src/physical_plan/file_format/parquet.rs  |   5 +
 datafusion/core/src/physical_plan/mod.rs           |   7 +-
 datafusion/core/src/test/mod.rs                    |   2 +
 datafusion/core/src/test/object_store.rs           |   2 +-
 datafusion/core/tests/path_partition.rs            |  16 +-
 datafusion/core/tests/row.rs                       |  11 +-
 datafusion/core/tests/sql/explain_analyze.rs       |  25 +-
 datafusion/core/tests/sql/json.rs                  |   2 +-
 datafusion/core/tests/sql/mod.rs                   |  58 +++-
 datafusion/data-access/Cargo.toml                  |   3 +-
 datafusion/data-access/src/object_store/local.rs   |  26 --
 datafusion/data-access/src/object_store/mod.rs     | 195 +------------
 datafusion/proto/src/logical_plan.rs               |   9 +-
 dev/build-arrow-ballista.sh                        |   2 +-
 34 files changed, 871 insertions(+), 643 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index ba10d51c2..c46badd64 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -50,6 +50,7 @@ use datafusion::{
 
 use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
 use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use datafusion::datasource::listing::ListingTableUrl;
 use serde::Serialize;
 use structopt::StructOpt;
 
@@ -425,7 +426,8 @@ fn get_table(
         table_partition_cols: vec![],
     };
 
-    let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path)
+    let table_path = ListingTableUrl::parse(path)?;
+    let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), 
table_path)
         .with_listing_options(options)
         .with_schema(schema);
 
diff --git a/datafusion-examples/examples/flight_server.rs 
b/datafusion-examples/examples/flight_server.rs
index 703cb7025..a3d7c0f56 100644
--- a/datafusion-examples/examples/flight_server.rs
+++ b/datafusion-examples/examples/flight_server.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
 use arrow_flight::SchemaAsIpc;
 use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::listing::ListingOptions;
+use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
 use futures::Stream;
 use tonic::transport::Server;
 use tonic::{Request, Response, Status, Streaming};
@@ -68,9 +68,11 @@ impl FlightService for FlightServiceImpl {
         let request = request.into_inner();
 
         let listing_options = 
ListingOptions::new(Arc::new(ParquetFormat::default()));
+        let table_path =
+            ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?;
 
         let schema = listing_options
-            .infer_schema(Arc::new(LocalFileSystem {}), &request.path[0])
+            .infer_schema(Arc::new(LocalFileSystem {}), &table_path)
             .await
             .unwrap();
 
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index f8170443e..08da53f9c 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", 
version = "8.0.0" }
 datafusion-row = { path = "../row", version = "8.0.0" }
 datafusion-sql = { path = "../sql", version = "8.0.0" }
 futures = "0.3"
+glob = "0.3.0"
 hashbrown = { version = "0.12", features = ["raw"] }
+itertools = "0.10"
 lazy_static = { version = "^1.4.0" }
 log = "^0.4"
 num-traits = { version = "0.2", optional = true }
@@ -85,6 +87,7 @@ sqlparser = "0.17"
 tempfile = "3"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync", "fs", "parking_lot"] }
 tokio-stream = "0.1"
+url = "2.2"
 uuid = { version = "1.0", features = ["v4"] }
 
 [dev-dependencies]
diff --git a/datafusion/core/benches/sort_limit_query_sql.rs 
b/datafusion/core/benches/sort_limit_query_sql.rs
index faeff6bc9..d1f253a98 100644
--- a/datafusion/core/benches/sort_limit_query_sql.rs
+++ b/datafusion/core/benches/sort_limit_query_sql.rs
@@ -20,7 +20,9 @@ extern crate criterion;
 use criterion::Criterion;
 use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
 use datafusion::datasource::file_format::csv::CsvFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable, 
ListingTableConfig};
+use datafusion::datasource::listing::{
+    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
 
 use parking_lot::Mutex;
 use std::sync::Arc;
@@ -64,11 +66,12 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
     let testdata = datafusion::test_util::arrow_test_data();
 
     let path = format!("{}/csv/aggregate_test_100.csv", testdata);
+    let table_path = ListingTableUrl::parse(path).unwrap();
 
     // create CSV data source
     let listing_options = ListingOptions::new(Arc::new(CsvFormat::default()));
 
-    let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), &path)
+    let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), 
table_path)
         .with_listing_options(listing_options)
         .with_schema(schema);
 
diff --git a/datafusion/core/src/catalog/schema.rs 
b/datafusion/core/src/catalog/schema.rs
index 9ef5c3d67..748cad52b 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -23,8 +23,8 @@ use std::any::Any;
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use crate::datasource::listing::{ListingTable, ListingTableConfig};
-use crate::datasource::object_store_registry::ObjectStoreRegistry;
+use crate::datasource::listing::{ListingTable, ListingTableConfig, 
ListingTableUrl};
+use crate::datasource::object_store::ObjectStoreRegistry;
 use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
 use datafusion_data_access::object_store::ObjectStore;
@@ -156,31 +156,33 @@ impl ObjectStoreSchemaProvider {
             .register_store(scheme.into(), object_store)
     }
 
-    /// Retrieves a `ObjectStore` instance by scheme
-    pub fn object_store<'a>(
+    /// Retrieves a `ObjectStore` instance for a given Url
+    pub fn object_store(
         &self,
-        uri: &'a str,
-    ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
+        url: impl AsRef<url::Url>,
+    ) -> Result<Arc<dyn ObjectStore>> {
         self.object_store_registry
             .lock()
-            .get_by_uri(uri)
+            .get_by_url(url)
             .map_err(DataFusionError::from)
     }
 
     /// If supported by the implementation, adds a new table to this schema by 
creating a
-    /// `ListingTable` from the provided `uri` and a previously registered 
`ObjectStore`.
+    /// `ListingTable` from the provided `url` and a previously registered 
`ObjectStore`.
     /// If a table of the same name existed before, it returns "Table already 
exists" error.
     pub async fn register_listing_table(
         &self,
         name: &str,
-        uri: &str,
+        table_path: ListingTableUrl,
         config: Option<ListingTableConfig>,
     ) -> Result<()> {
         let config = match config {
             Some(cfg) => cfg,
             None => {
-                let (object_store, _path) = self.object_store(uri)?;
-                ListingTableConfig::new(object_store, uri).infer().await?
+                let object_store = self.object_store(&table_path)?;
+                ListingTableConfig::new(object_store, table_path)
+                    .infer()
+                    .await?
             }
         };
 
@@ -255,6 +257,7 @@ mod tests {
     use crate::datasource::empty::EmptyTable;
     use crate::execution::context::SessionContext;
 
+    use crate::datasource::listing::ListingTableUrl;
     use futures::StreamExt;
 
     #[tokio::test]
@@ -280,12 +283,13 @@ mod tests {
     async fn test_schema_register_listing_table() {
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+        let table_path = ListingTableUrl::parse(filename).unwrap();
 
         let schema = ObjectStoreSchemaProvider::new();
         let _store = schema.register_object_store("test", 
Arc::new(LocalFileSystem {}));
 
         schema
-            .register_listing_table("alltypes_plain", &filename, None)
+            .register_listing_table("alltypes_plain", table_path, None)
             .await
             .unwrap();
 
@@ -338,8 +342,9 @@ mod tests {
                 || file == OsStr::new("alltypes_plain.parquet")
             {
                 let name = path.file_stem().unwrap().to_str().unwrap();
+                let path = ListingTableUrl::parse(&sized_file.path).unwrap();
                 schema
-                    .register_listing_table(name, &sized_file.path, None)
+                    .register_listing_table(name, path, None)
                     .await
                     .unwrap();
             }
@@ -360,17 +365,18 @@ mod tests {
     async fn test_schema_register_same_listing_table() {
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+        let table_path = ListingTableUrl::parse(filename).unwrap();
 
         let schema = ObjectStoreSchemaProvider::new();
         let _store = schema.register_object_store("test", 
Arc::new(LocalFileSystem {}));
 
         schema
-            .register_listing_table("alltypes_plain", &filename, None)
+            .register_listing_table("alltypes_plain", table_path.clone(), None)
             .await
             .unwrap();
 
         schema
-            .register_listing_table("alltypes_plain", &filename, None)
+            .register_listing_table("alltypes_plain", table_path, None)
             .await
             .unwrap();
     }
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index 669ed0efd..eae86fa9c 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -85,6 +85,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
 pub(crate) mod test_util {
     use super::*;
     use crate::datasource::listing::PartitionedFile;
+    use crate::datasource::object_store::ObjectStoreUrl;
     use datafusion_data_access::object_store::local::{
         local_unpartitioned_file, LocalFileSystem,
     };
@@ -115,6 +116,7 @@ pub(crate) mod test_util {
             .create_physical_plan(
                 FileScanConfig {
                     object_store: store,
+                    object_store_url: ObjectStoreUrl::local_filesystem(),
                     file_schema,
                     file_groups,
                     statistics,
diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index 11a91f2ee..a26eafabb 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -17,7 +17,6 @@
 
 //! Helper functions for the table implementation
 
-use std::path::{Component, Path};
 use std::sync::Arc;
 
 use arrow::{
@@ -29,11 +28,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use chrono::{TimeZone, Utc};
-use datafusion_common::DataFusionError;
-use futures::{
-    stream::{self},
-    StreamExt, TryStreamExt,
-};
+use futures::{stream::BoxStream, TryStreamExt};
 use log::debug;
 
 use crate::{
@@ -44,7 +39,8 @@ use crate::{
     scalar::ScalarValue,
 };
 
-use super::{PartitionedFile, PartitionedFileStream};
+use super::PartitionedFile;
+use crate::datasource::listing::ListingTableUrl;
 use datafusion_data_access::{object_store::ObjectStore, FileMeta, SizedFile};
 use datafusion_expr::Volatility;
 
@@ -161,94 +157,53 @@ pub fn split_files(
 /// TODO for tables with many files (10k+), it will usually more efficient
 /// to first list the folders relative to the first partition dimension,
 /// prune those, then list only the contain of the remaining folders.
-pub async fn pruned_partition_list(
-    store: &dyn ObjectStore,
-    table_path: &str,
+pub async fn pruned_partition_list<'a>(
+    store: &'a dyn ObjectStore,
+    table_path: &'a ListingTableUrl,
     filters: &[Expr],
-    file_extension: &str,
-    table_partition_cols: &[String],
-) -> Result<PartitionedFileStream> {
+    file_extension: &'a str,
+    table_partition_cols: &'a [String],
+) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
+    let list = table_path.list_all_files(store, file_extension);
+
     // if no partition col => simply list all the files
     if table_partition_cols.is_empty() {
-        return Ok(Box::pin(
-            store
-                .glob_file_with_suffix(table_path, file_extension)
-                .await?
-                .map(|f| {
-                    Ok(PartitionedFile {
-                        partition_values: vec![],
-                        file_meta: f?,
-                        range: None,
-                    })
-                }),
-        ));
+        return Ok(Box::pin(list.map_ok(|object_meta| object_meta.into())));
     }
 
     let applicable_filters: Vec<_> = filters
         .iter()
         .filter(|f| expr_applicable_for_cols(table_partition_cols, f))
         .collect();
-    let stream_path = table_path.to_owned();
+
     if applicable_filters.is_empty() {
         // Parse the partition values while listing all the files
         // Note: We might avoid parsing the partition values if they are not 
used in any projection,
         // but the cost of parsing will likely be far dominated by the time to 
fetch the listing from
         // the object store.
-        let table_partition_cols_stream = table_partition_cols.to_vec();
-        Ok(Box::pin(
-            store
-                .glob_file_with_suffix(table_path, file_extension)
-                .await?
-                .filter_map(move |f| {
-                    let stream_path = stream_path.clone();
-                    let table_partition_cols_stream = 
table_partition_cols_stream.clone();
-                    async move {
-                        let file_meta = match f {
-                            Ok(fm) => fm,
-                            Err(err) => return Some(Err(err)),
-                        };
-                        let parsed_path = parse_partitions_for_path(
-                            &stream_path,
-                            file_meta.path(),
-                            &table_partition_cols_stream,
-                        )
-                        .map(|p| {
-                            p.iter()
-                                .map(|&pn| 
ScalarValue::Utf8(Some(pn.to_owned())))
-                                .collect()
-                        });
-
-                        parsed_path.map(|partition_values| {
-                            Ok(PartitionedFile {
-                                partition_values,
-                                file_meta,
-                                range: None,
-                            })
-                        })
-                    }
-                }),
-        ))
+        Ok(Box::pin(list.try_filter_map(move |file_meta| async move {
+            let parsed_path = parse_partitions_for_path(
+                table_path,
+                file_meta.path(),
+                table_partition_cols,
+            )
+            .map(|p| {
+                p.iter()
+                    .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
+                    .collect()
+            });
+
+            Ok(parsed_path.map(|partition_values| PartitionedFile {
+                partition_values,
+                file_meta,
+                range: None,
+            }))
+        })))
     } else {
         // parse the partition values and serde them as a RecordBatch to 
filter them
-        // TODO avoid collecting but have a streaming memory table instead
-        let batches: Vec<RecordBatch> = store
-            .glob_file_with_suffix(table_path, file_extension)
-            .await?
-            // TODO we set an arbitrary high batch size here, it does not 
matter as we list
-            // all the files anyway. This number will need to be adjusted 
according to the object
-            // store if we switch to a streaming-stlye pruning of the files. 
For instance S3 lists
-            // 1000 items at a time so batches of 1000 would be ideal with S3 
as store.
-            .chunks(1024)
-            .map(|v| {
-                v.into_iter()
-                    .collect::<datafusion_data_access::Result<Vec<_>>>()
-            })
-            .map_err(DataFusionError::IoError)
-            .map(move |metas| paths_to_batch(table_partition_cols, 
&stream_path, &metas?))
-            .try_collect()
-            .await?;
-
-        let mem_table = MemTable::try_new(batches[0].schema(), vec![batches])?;
+        let metas: Vec<_> = list.try_collect().await?;
+        let batch = paths_to_batch(table_partition_cols, table_path, &metas)?;
+        let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
 
         // Filter the partitions using a local datafusion context
         // TODO having the external context would allow us to resolve 
`Volatility::Stable`
@@ -260,7 +215,7 @@ pub async fn pruned_partition_list(
         }
         let filtered_batches = df.collect().await?;
 
-        Ok(Box::pin(stream::iter(
+        Ok(Box::pin(futures::stream::iter(
             batches_to_paths(&filtered_batches).into_iter().map(Ok),
         )))
     }
@@ -275,7 +230,7 @@ pub async fn pruned_partition_list(
 /// Note: For the last modified date, this looses precisions higher than 
millisecond.
 fn paths_to_batch(
     table_partition_cols: &[String],
-    table_path: &str,
+    table_path: &ListingTableUrl,
     metas: &[FileMeta],
 ) -> Result<RecordBatch> {
     let mut key_builder = StringBuilder::new(metas.len());
@@ -373,21 +328,15 @@ fn batches_to_paths(batches: &[RecordBatch]) -> 
Vec<PartitionedFile> {
 /// Extract the partition values for the given `file_path` (in the given 
`table_path`)
 /// associated to the partitions defined by `table_partition_cols`
 fn parse_partitions_for_path<'a>(
-    table_path: &str,
+    table_path: &ListingTableUrl,
     file_path: &'a str,
     table_partition_cols: &[String],
 ) -> Option<Vec<&'a str>> {
-    let subpath = file_path.strip_prefix(table_path)?;
-
-    // split subpath into components ignoring leading separator if exists
-    let subpath = Path::new(subpath)
-        .components()
-        .filter(|c| !matches!(c, Component::RootDir))
-        .filter_map(|c| c.as_os_str().to_str());
+    let subpath = table_path.strip_prefix(file_path)?;
 
     let mut part_values = vec![];
-    for (path, pn) in subpath.zip(table_partition_cols) {
-        match path.split_once('=') {
+    for (part, pn) in subpath.zip(table_partition_cols) {
+        match part.split_once('=') {
             Some((name, val)) if name == pn => part_values.push(val),
             _ => return None,
         }
@@ -401,6 +350,7 @@ mod tests {
         logical_plan::{case, col, lit},
         test::object_store::TestObjectStore,
     };
+    use futures::StreamExt;
 
     use super::*;
 
@@ -453,7 +403,7 @@ mod tests {
         let filter = Expr::eq(col("mypartition"), lit("val1"));
         let pruned = pruned_partition_list(
             store.as_ref(),
-            "tablepath/",
+            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter],
             ".parquet",
             &[String::from("mypartition")],
@@ -476,33 +426,34 @@ mod tests {
         let filter = Expr::eq(col("mypartition"), lit("val1"));
         let pruned = pruned_partition_list(
             store.as_ref(),
-            "tablepath/",
+            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter],
             ".parquet",
             &[String::from("mypartition")],
         )
         .await
         .expect("partition pruning failed")
-        .collect::<Vec<_>>()
-        .await;
+        .try_collect::<Vec<_>>()
+        .await
+        .unwrap();
 
         assert_eq!(pruned.len(), 2);
-        let f1 = pruned[0].as_ref().expect("first item not an error");
+        let f1 = &pruned[0];
         assert_eq!(
-            &f1.file_meta.sized_file.path,
+            f1.file_meta.path(),
             "tablepath/mypartition=val1/file.parquet"
         );
         assert_eq!(
             &f1.partition_values,
             &[ScalarValue::Utf8(Some(String::from("val1"))),]
         );
-        let f2 = pruned[1].as_ref().expect("second item not an error");
+        let f2 = &pruned[1];
         assert_eq!(
-            &f2.file_meta.sized_file.path,
+            f2.file_meta.path(),
             "tablepath/mypartition=val1/other=val3/file.parquet"
         );
         assert_eq!(
-            &f2.partition_values,
+            f2.partition_values,
             &[ScalarValue::Utf8(Some(String::from("val1"))),]
         );
     }
@@ -522,20 +473,21 @@ mod tests {
         let filter3 = Expr::eq(col("part2"), col("other"));
         let pruned = pruned_partition_list(
             store.as_ref(),
-            "tablepath/",
+            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter1, filter2, filter3],
             ".parquet",
             &[String::from("part1"), String::from("part2")],
         )
         .await
         .expect("partition pruning failed")
-        .collect::<Vec<_>>()
-        .await;
+        .try_collect::<Vec<_>>()
+        .await
+        .unwrap();
 
         assert_eq!(pruned.len(), 2);
-        let f1 = pruned[0].as_ref().expect("first item not an error");
+        let f1 = &pruned[0];
         assert_eq!(
-            &f1.file_meta.sized_file.path,
+            f1.file_meta.path(),
             "tablepath/part1=p1v2/part2=p2v1/file1.parquet"
         );
         assert_eq!(
@@ -545,9 +497,9 @@ mod tests {
                 ScalarValue::Utf8(Some(String::from("p2v1")))
             ]
         );
-        let f2 = pruned[1].as_ref().expect("second item not an error");
+        let f2 = &pruned[1];
         assert_eq!(
-            &f2.file_meta.sized_file.path,
+            f2.file_meta.path(),
             "tablepath/part1=p1v2/part2=p2v1/file2.parquet"
         );
         assert_eq!(
@@ -563,12 +515,16 @@ mod tests {
     fn test_parse_partitions_for_path() {
         assert_eq!(
             Some(vec![]),
-            parse_partitions_for_path("bucket/mytable", 
"bucket/mytable/file.csv", &[])
+            parse_partitions_for_path(
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
+                "bucket/mytable/file.csv",
+                &[]
+            )
         );
         assert_eq!(
             None,
             parse_partitions_for_path(
-                "bucket/othertable",
+                &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
                 "bucket/mytable/file.csv",
                 &[]
             )
@@ -576,7 +532,7 @@ mod tests {
         assert_eq!(
             None,
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/file.csv",
                 &[String::from("mypartition")]
             )
@@ -584,7 +540,7 @@ mod tests {
         assert_eq!(
             Some(vec!["v1"]),
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/mypartition=v1/file.csv",
                 &[String::from("mypartition")]
             )
@@ -592,7 +548,7 @@ mod tests {
         assert_eq!(
             Some(vec!["v1"]),
             parse_partitions_for_path(
-                "bucket/mytable/",
+                &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
                 "bucket/mytable/mypartition=v1/file.csv",
                 &[String::from("mypartition")]
             )
@@ -601,7 +557,7 @@ mod tests {
         assert_eq!(
             None,
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/v1/file.csv",
                 &[String::from("mypartition")]
             )
@@ -609,7 +565,7 @@ mod tests {
         assert_eq!(
             Some(vec!["v1", "v2"]),
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
                 &[String::from("mypartition"), String::from("otherpartition")]
             )
@@ -617,7 +573,7 @@ mod tests {
         assert_eq!(
             Some(vec!["v1"]),
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
                 &[String::from("mypartition")]
             )
@@ -630,7 +586,7 @@ mod tests {
         assert_eq!(
             Some(vec!["v1"]),
             parse_partitions_for_path(
-                "bucket\\mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket\\mytable\\mypartition=v1\\file.csv",
                 &[String::from("mypartition")]
             )
@@ -638,7 +594,7 @@ mod tests {
         assert_eq!(
             Some(vec!["v1", "v2"]),
             parse_partitions_for_path(
-                "bucket\\mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket\\mytable\\mypartition=v1\\otherpartition=v2\\file.csv",
                 &[String::from("mypartition"), String::from("otherpartition")]
             )
@@ -664,7 +620,8 @@ mod tests {
             },
         ];
 
-        let batches = paths_to_batch(&[], "mybucket/tablepath", &files)
+        let table_path = 
ListingTableUrl::parse("file:///mybucket/tablepath").unwrap();
+        let batches = paths_to_batch(&[], &table_path, &files)
             .expect("Serialization of file list to batch failed");
 
         let parsed_files = batches_to_paths(&[batches]);
@@ -698,9 +655,12 @@ mod tests {
             },
         ];
 
-        let batches =
-            paths_to_batch(&[String::from("part1")], "mybucket/tablepath", 
&files)
-                .expect("Serialization of file list to batch failed");
+        let batches = paths_to_batch(
+            &[String::from("part1")],
+            &ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(),
+            &files,
+        )
+        .expect("Serialization of file list to batch failed");
 
         let parsed_files = batches_to_paths(&[batches]);
         assert_eq!(parsed_files.len(), 2);
diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/core/src/datasource/listing/mod.rs
index 0f0a7d20e..c11de5f80 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -20,12 +20,14 @@
 
 mod helpers;
 mod table;
+mod url;
 
 use datafusion_common::ScalarValue;
 use datafusion_data_access::{FileMeta, Result, SizedFile};
 use futures::Stream;
 use std::pin::Pin;
 
+pub use self::url::ListingTableUrl;
 pub use table::{ListingOptions, ListingTable, ListingTableConfig};
 
 /// Stream of files get listed from object store
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 1dceb8b35..34e44971d 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -28,7 +28,9 @@ use crate::datasource::{
         avro::AvroFormat, csv::CsvFormat, json::JsonFormat, 
parquet::ParquetFormat,
         FileFormat,
     },
-    get_statistics_with_limit, TableProvider, TableType,
+    get_statistics_with_limit,
+    listing::ListingTableUrl,
+    TableProvider, TableType,
 };
 use crate::logical_expr::TableProviderFilterPushDown;
 use crate::{
@@ -51,7 +53,7 @@ pub struct ListingTableConfig {
     /// `ObjectStore` that contains the files for the `ListingTable`.
     pub object_store: Arc<dyn ObjectStore>,
     /// Path on the `ObjectStore` for creating `ListingTable`.
-    pub table_path: String,
+    pub table_path: ListingTableUrl,
     /// Optional `SchemaRef` for the to be created `ListingTable`.
     pub file_schema: Option<SchemaRef>,
     /// Optional `ListingOptions` for the to be created `ListingTable`.
@@ -60,13 +62,10 @@ pub struct ListingTableConfig {
 
 impl ListingTableConfig {
     /// Creates new `ListingTableConfig`.  The `SchemaRef` and 
`ListingOptions` are inferred based on the suffix of the provided `table_path`.
-    pub fn new(
-        object_store: Arc<dyn ObjectStore>,
-        table_path: impl Into<String>,
-    ) -> Self {
+    pub fn new(object_store: Arc<dyn ObjectStore>, table_path: 
ListingTableUrl) -> Self {
         Self {
             object_store,
-            table_path: table_path.into(),
+            table_path,
             file_schema: None,
             options: None,
         }
@@ -106,18 +105,18 @@ impl ListingTableConfig {
 
     /// Infer `ListingOptions` based on `table_path` suffix.
     pub async fn infer_options(self) -> Result<Self> {
-        let mut files = self.object_store.list_file(&self.table_path).await?;
-        let file = files
+        let file = self
+            .table_path
+            .list_all_files(self.object_store.as_ref(), "")
             .next()
             .await
             .ok_or_else(|| DataFusionError::Internal("No files for 
table".into()))??;
 
-        let tokens: Vec<&str> = file.path().split('.').collect();
-        let file_type = tokens.last().ok_or_else(|| {
+        let file_type = file.path().rsplit('.').next().ok_or_else(|| {
             DataFusionError::Internal("Unable to infer file suffix".into())
         })?;
 
-        let format = ListingTableConfig::infer_format(*file_type)?;
+        let format = ListingTableConfig::infer_format(file_type)?;
 
         let listing_options = ListingOptions {
             format,
@@ -140,7 +139,7 @@ impl ListingTableConfig {
         match self.options {
             Some(options) => {
                 let schema = options
-                    .infer_schema(self.object_store.clone(), 
self.table_path.as_str())
+                    .infer_schema(self.object_store.clone(), &self.table_path)
                     .await?;
 
                 Ok(Self {
@@ -213,10 +212,9 @@ impl ListingOptions {
     pub async fn infer_schema<'a>(
         &'a self,
         store: Arc<dyn ObjectStore>,
-        path: &'a str,
+        table_path: &'a ListingTableUrl,
     ) -> Result<SchemaRef> {
-        let extension = &self.file_extension;
-        let list_stream = store.glob_file_with_suffix(path, extension).await?;
+        let list_stream = table_path.list_all_files(store.as_ref(), 
&self.file_extension);
         let files: Vec<_> = list_stream.try_collect().await?;
         self.format.infer_schema(&store, &files).await
     }
@@ -226,7 +224,7 @@ impl ListingOptions {
 /// or file system listing capability to get the list of files.
 pub struct ListingTable {
     object_store: Arc<dyn ObjectStore>,
-    table_path: String,
+    table_path: ListingTableUrl,
     /// File fields only
     file_schema: SchemaRef,
     /// File fields + partition columns
@@ -276,10 +274,12 @@ impl ListingTable {
     pub fn object_store(&self) -> &Arc<dyn ObjectStore> {
         &self.object_store
     }
+
     /// Get path ref
-    pub fn table_path(&self) -> &str {
+    pub fn table_path(&self) -> &ListingTableUrl {
         &self.table_path
     }
+
     /// Get options ref
     pub fn options(&self) -> &ListingOptions {
         &self.options
@@ -322,6 +322,7 @@ impl TableProvider for ListingTable {
             .create_physical_plan(
                 FileScanConfig {
                     object_store: Arc::clone(&self.object_store),
+                    object_store_url: self.table_path.object_store(),
                     file_schema: Arc::clone(&self.file_schema),
                     file_groups: partitioned_file_lists,
                     statistics,
@@ -369,6 +370,7 @@ impl ListingTable {
         .await?;
 
         // collect the statistics if required by the config
+        // TODO: Collect statistics and schema in single-pass
         let object_store = Arc::clone(&self.object_store);
         let files = file_list.then(move |part_file| {
             let object_store = object_store.clone();
@@ -436,11 +438,12 @@ mod tests {
     async fn load_table_stats_by_default() -> Result<()> {
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+        let table_path = ListingTableUrl::parse(filename).unwrap();
         let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
         let schema = opt
-            .infer_schema(Arc::new(LocalFileSystem {}), &filename)
+            .infer_schema(Arc::new(LocalFileSystem {}), &table_path)
             .await?;
-        let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), 
filename)
+        let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), 
table_path)
             .with_listing_options(opt)
             .with_schema(schema);
         let table = ListingTable::try_new(config)?;
@@ -464,9 +467,10 @@ mod tests {
             collect_stat: true,
         };
 
+        let table_path = ListingTableUrl::parse("file:///table/").unwrap();
         let file_schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, 
false)]));
-        let config = ListingTableConfig::new(store, "table/")
+        let config = ListingTableConfig::new(store, table_path)
             .with_listing_options(opt)
             .with_schema(file_schema);
         let table = ListingTable::try_new(config)?;
@@ -504,7 +508,7 @@ mod tests {
                 "bucket/key-prefix/file3",
                 "bucket/key-prefix/file4",
             ],
-            "bucket/key-prefix/",
+            "file:///bucket/key-prefix/",
             12,
             5,
         )
@@ -518,7 +522,7 @@ mod tests {
                 "bucket/key-prefix/file2",
                 "bucket/key-prefix/file3",
             ],
-            "bucket/key-prefix/",
+            "file:///bucket/key-prefix/",
             4,
             4,
         )
@@ -533,14 +537,15 @@ mod tests {
                 "bucket/key-prefix/file3",
                 "bucket/key-prefix/file4",
             ],
-            "bucket/key-prefix/",
+            "file:///bucket/key-prefix/",
             2,
             2,
         )
         .await?;
 
         // no files => no groups
-        assert_list_files_for_scan_grouping(&[], "bucket/key-prefix/", 2, 
0).await?;
+        assert_list_files_for_scan_grouping(&[], "file:///bucket/key-prefix/", 
2, 0)
+            .await?;
 
         // files that don't match the prefix
         assert_list_files_for_scan_grouping(
@@ -549,7 +554,7 @@ mod tests {
                 "bucket/key-prefix/file1",
                 "bucket/other-prefix/roguefile",
             ],
-            "bucket/key-prefix/",
+            "file:///bucket/key-prefix/",
             10,
             2,
         )
@@ -560,7 +565,8 @@ mod tests {
     async fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/{}", testdata, name);
-        let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), 
filename)
+        let table_path = ListingTableUrl::parse(filename).unwrap();
+        let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), 
table_path)
             .infer()
             .await?;
         let table = ListingTable::try_new(config)?;
@@ -590,7 +596,8 @@ mod tests {
 
         let schema = Schema::new(vec![Field::new("a", DataType::Boolean, 
false)]);
 
-        let config = ListingTableConfig::new(mock_store, 
table_prefix.to_owned())
+        let table_path = ListingTableUrl::parse(table_prefix).unwrap();
+        let config = ListingTableConfig::new(mock_store, table_path)
             .with_listing_options(opt)
             .with_schema(Arc::new(schema));
 
diff --git a/datafusion/core/src/datasource/listing/url.rs 
b/datafusion/core/src/datasource/listing/url.rs
new file mode 100644
index 000000000..041a6ab7f
--- /dev/null
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see 
[`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob File Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it 
will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression 
will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, 
resolved to
+    /// an absolute path on the local filesystem, returning an error if it 
does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and 
used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a 
drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => 
Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {
+        let (prefix, glob) = match split_glob_expression(s) {
+            Some((prefix, glob)) => {
+                let glob = Pattern::new(glob)
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                (prefix, Some(glob))
+            }
+            None => (s, None),
+        };
+
+        let path = std::path::Path::new(prefix).canonicalize()?;
+        let url = match path.is_file() {
+            true => Url::from_file_path(path).unwrap(),
+            false => Url::from_directory_path(path).unwrap(),
+        };
+
+        Ok(Self { url, glob })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this is an absolute
+    /// on the local filesystem in the OS-specific path representation
+    ///
+    /// For other URLs, this is a the host and path of the URL,
+    /// delimited by `/`, and with no leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {
+        match self.scheme() {
+            "file" => match cfg!(target_family = "windows") {
+                true => self.url.path().strip_prefix('/').unwrap(),
+                false => self.url.path(),
+            },
+            _ => 
&self.url[url::Position::BeforeHost..url::Position::AfterPath],
+        }
+    }
+
+    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, 
returning
+    /// an iterator of the remaining path segments
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+        &'a self,
+        path: &'b str,
+    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+        let prefix = self.prefix();
+        // Ignore empty path segments
+        let diff = itertools::diff_with(
+            path.split(is_separator).filter(|s| !s.is_empty()),
+            prefix.split(is_separator).filter(|s| !s.is_empty()),
+            |a, b| a == b,
+        );
+
+        match diff {
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+            _ => None,
+        }
+    }
+
+    /// List all files identified by this [`ListingTableUrl`] for the provided 
`file_extension`
+    pub(crate) fn list_all_files<'a>(
+        &'a self,
+        store: &'a dyn ObjectStore,
+        file_extension: &'a str,
+    ) -> BoxStream<'a, Result<FileMeta>> {
+        futures::stream::once(async move {
+            let prefix = self.prefix();
+            store.list_file(prefix.as_ref()).await
+        })
+        .try_flatten()
+        .map_err(DataFusionError::IoError)
+        .try_filter(move |meta| {
+            let path = meta.path();
+
+            let extension_match = path.ends_with(file_extension);
+            let glob_match = match &self.glob {
+                Some(glob) => match self.strip_prefix(path) {
+                    Some(mut segments) => {
+                        let stripped = segments.join("/");
+                        glob.matches(&stripped)
+                    }
+                    None => false,
+                },
+                None => true,
+            };
+
+            futures::future::ready(extension_match && glob_match)
+        })
+        .boxed()
+    }
+
+    /// Returns this [`ListingTableUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+
+    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
+    pub fn object_store(&self) -> ObjectStoreUrl {
+        let url = 
&self.url[url::Position::BeforeScheme..url::Position::BeforePath];
+        ObjectStoreUrl::parse(url).unwrap()
+    }
+}
+
+impl AsRef<str> for ListingTableUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ListingTableUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ListingTableUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
+
+/// Splits `path` at the first path segment containing a glob expression, 
returning
+/// `None` if no glob expression found.
+///
+/// Path delimiters are determined using [`std::path::is_separator`] which
+/// permits `/` as a path delimiter even on Windows platforms.
+///
+fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
+    let mut last_separator = 0;
+
+    for (byte_idx, char) in path.char_indices() {
+        if GLOB_START_CHARS.contains(&char) {
+            if last_separator == 0 {
+                return Some((".", path));
+            }
+            return Some(path.split_at(last_separator));
+        }
+
+        if std::path::is_separator(char) {
+            last_separator = byte_idx + char.len_utf8();
+        }
+    }
+    None
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_prefix_path() {
+        let root = std::env::current_dir().unwrap();
+        let root = root.to_string_lossy();
+
+        let url = ListingTableUrl::parse(&root).unwrap();
+        let child = format!("{}/partition/file", root);
+
+        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "file"]);
+    }
+
+    #[test]
+    fn test_prefix_s3() {
+        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
+        assert_eq!(url.prefix(), "bucket/foo/bar");
+
+        let path = "bucket/foo/bar/partition/foo.parquet";
+        let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "foo.parquet"]);
+
+        let path = "other-bucket/foo/bar/partition/foo.parquet";
+        assert!(url.strip_prefix(path).is_none());
+    }
+
+    #[test]
+    fn test_split_glob() {
+        fn test(input: &str, expected: Option<(&str, &str)>) {
+            assert_eq!(
+                split_glob_expression(input),
+                expected,
+                "testing split_glob_expression with {}",
+                input
+            );
+        }
+
+        // no glob patterns
+        test("/", None);
+        test("/a.txt", None);
+        test("/a", None);
+        test("/a/", None);
+        test("/a/b", None);
+        test("/a/b/", None);
+        test("/a/b.txt", None);
+        test("/a/b/c.txt", None);
+        // glob patterns, thus we build the longest path (os-specific)
+        test("*.txt", Some((".", "*.txt")));
+        test("/*.txt", Some(("/", "*.txt")));
+        test("/a/*b.txt", Some(("/a/", "*b.txt")));
+        test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
+        test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
+        test("/a/b*.txt", Some(("/a/", "b*.txt")));
+        test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
+
+        // https://github.com/apache/arrow-datafusion/issues/2465
+        test(
+            "/a/b/c//alltypes_plain*.parquet",
+            Some(("/a/b/c//", "alltypes_plain*.parquet")),
+        );
+    }
+}
diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index f3cc0a04e..65fc2adcb 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -23,7 +23,7 @@ pub mod empty;
 pub mod file_format;
 pub mod listing;
 pub mod memory;
-pub mod object_store_registry;
+pub mod object_store;
 pub mod view;
 
 use futures::Stream;
diff --git a/datafusion/core/src/datasource/object_store.rs 
b/datafusion/core/src/datasource/object_store.rs
new file mode 100644
index 000000000..ac7e1a847
--- /dev/null
+++ b/datafusion/core/src/datasource/object_store.rs
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme 
for each store.
+//! This allows the user to extend DataFusion with different storage systems 
such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, 
LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| 
DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: 
{}",
+                remaining
+            )));
+        }
+
+        // Always set path for consistency
+        parsed.set_path("/");
+        Ok(Self { url: parsed })
+    }
+
+    /// An [`ObjectStoreUrl`] for the local filesystem
+    pub fn local_filesystem() -> Self {
+        Self::parse("file://").unwrap()
+    }
+
+    /// Returns this [`ObjectStoreUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+}
+
+impl AsRef<str> for ObjectStoreUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ObjectStoreUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ObjectStoreUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+/// Object store registry
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations 
for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl std::fmt::Debug for ObjectStoreRegistry {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("ObjectStoreRegistry")
+            .field(
+                "schemes",
+                &self.object_stores.read().keys().collect::<Vec<_>>(),
+            )
+            .finish()
+    }
+}
+
+impl Default for ObjectStoreRegistry {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read 
local files natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the 
registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file://` or no schema will return the default 
LocalFS store
+    /// - URL with scheme `s3://` will return the S3 store if it's registered
+    ///
+    pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn 
ObjectStore>> {
+        let url = url.as_ref();
+        let store = self.get(url.scheme()).ok_or_else(|| {
+            DataFusionError::Internal(format!(
+                "No suitable object store found for {}",
+                url
+            ))
+        })?;
+
+        Ok(store)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::datasource::listing::ListingTableUrl;
+    use datafusion_data_access::object_store::local::LocalFileSystem;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_object_store_url() {
+        let listing = ListingTableUrl::parse("file:///").unwrap();
+        let store = listing.object_store();
+        assert_eq!(store.as_str(), "file:///");
+
+        let file = ObjectStoreUrl::parse("file://").unwrap();
+        assert_eq!(file.as_str(), "file:///");
+
+        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
+        let store = listing.object_store();
+        assert_eq!(store.as_str(), "s3://bucket/");
+
+        let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
+        assert_eq!(url.as_str(), "s3://bucket/");
+
+        let url = 
ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
+        assert_eq!(url.as_str(), "s3://username:password@host:123/");
+
+        let err = ObjectStoreUrl::parse("s3://bucket:invalid").unwrap_err();
+        assert_eq!(err.to_string(), "External error: invalid port number");
+
+        let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err();
+        assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only 
contain scheme and authority, got: ?");
+
+        let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err();
+        assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only 
contain scheme and authority, got: ?foo=bar");
+
+        let err = ObjectStoreUrl::parse("s3://host:123/foo").unwrap_err();
+        assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only 
contain scheme and authority, got: /foo");
+
+        let err =
+            
ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err();
+        assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only 
contain scheme and authority, got: /foo");
+    }
+
+    #[test]
+    fn test_get_by_url_s3() {
+        let sut = ObjectStoreRegistry::default();
+        sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {}));
+        let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
+        sut.get_by_url(&url).unwrap();
+    }
+
+    #[test]
+    fn test_get_by_url_file() {
+        let sut = ObjectStoreRegistry::default();
+        let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
+        sut.get_by_url(&url).unwrap();
+    }
+
+    #[test]
+    fn test_get_by_url_local() {
+        let sut = ObjectStoreRegistry::default();
+        let url = ListingTableUrl::parse("../").unwrap();
+        sut.get_by_url(&url).unwrap();
+    }
+}
diff --git a/datafusion/core/src/datasource/object_store_registry.rs 
b/datafusion/core/src/datasource/object_store_registry.rs
deleted file mode 100644
index 70336af02..000000000
--- a/datafusion/core/src/datasource/object_store_registry.rs
+++ /dev/null
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme 
for each store.
-//! This allows the user to extend DataFusion with different storage systems 
such as S3 or HDFS
-//! and query data inside these systems.
-
-use datafusion_common::{DataFusionError, Result};
-use datafusion_data_access::object_store::local::{LocalFileSystem, 
LOCAL_SCHEME};
-use datafusion_data_access::object_store::ObjectStore;
-use parking_lot::RwLock;
-use std::collections::HashMap;
-use std::fmt;
-use std::sync::Arc;
-
-/// Object store registry
-pub struct ObjectStoreRegistry {
-    /// A map from scheme to object store that serve list / read operations 
for the store
-    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
-}
-
-impl fmt::Debug for ObjectStoreRegistry {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        f.debug_struct("ObjectStoreRegistry")
-            .field(
-                "schemes",
-                &self.object_stores.read().keys().collect::<Vec<_>>(),
-            )
-            .finish()
-    }
-}
-
-impl Default for ObjectStoreRegistry {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl ObjectStoreRegistry {
-    /// Create the registry that object stores can registered into.
-    /// ['LocalFileSystem'] store is registered in by default to support read 
local files natively.
-    pub fn new() -> Self {
-        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
-        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
-
-        Self {
-            object_stores: RwLock::new(map),
-        }
-    }
-
-    /// Adds a new store to this registry.
-    /// If a store of the same prefix existed before, it is replaced in the 
registry and returned.
-    pub fn register_store(
-        &self,
-        scheme: String,
-        store: Arc<dyn ObjectStore>,
-    ) -> Option<Arc<dyn ObjectStore>> {
-        let mut stores = self.object_stores.write();
-        stores.insert(scheme, store)
-    }
-
-    /// Get the store registered for scheme
-    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
-        let stores = self.object_stores.read();
-        stores.get(scheme).cloned()
-    }
-
-    /// Get a suitable store for the URI based on it's scheme. For example:
-    /// - URI with scheme `file://` or no schema will return the default 
LocalFS store
-    /// - URI with scheme `s3://` will return the S3 store if it's registered
-    /// Returns a tuple with the store and the self-described uri of the file 
in that store
-    pub fn get_by_uri<'a>(
-        &self,
-        uri: &'a str,
-    ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
-        if let Some((scheme, path)) = uri.split_once("://") {
-            let stores = self.object_stores.read();
-            let store = stores
-                .get(&*scheme.to_lowercase())
-                .map(Clone::clone)
-                .ok_or_else(|| {
-                    DataFusionError::Internal(format!(
-                        "No suitable object store found for {}",
-                        scheme
-                    ))
-                })?;
-            Ok((store, path))
-        } else {
-            Ok((Arc::new(LocalFileSystem), uri))
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::ObjectStoreRegistry;
-    use datafusion_data_access::object_store::local::LocalFileSystem;
-    use std::sync::Arc;
-
-    #[test]
-    fn test_get_by_uri_s3() {
-        let sut = ObjectStoreRegistry::default();
-        sut.register_store("s3".to_string(), Arc::new(LocalFileSystem {}));
-        let uri = "s3://bucket/key";
-        let (_, path) = sut.get_by_uri(uri).unwrap();
-        assert_eq!(path, "bucket/key");
-    }
-
-    #[test]
-    fn test_get_by_uri_file() {
-        let sut = ObjectStoreRegistry::default();
-        let uri = "file:///bucket/key";
-        let (_, path) = sut.get_by_uri(uri).unwrap();
-        assert_eq!(path, "/bucket/key");
-    }
-
-    #[test]
-    fn test_get_by_uri_local() {
-        let sut = ObjectStoreRegistry::default();
-        let uri = "/bucket/key";
-        let (_, path) = sut.get_by_uri(uri).unwrap();
-        assert_eq!(path, "/bucket/key");
-    }
-}
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 652834d73..4d579776e 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -57,7 +57,7 @@ use crate::catalog::{
     schema::{MemorySchemaProvider, SchemaProvider},
 };
 use crate::dataframe::DataFrame;
-use crate::datasource::listing::ListingTableConfig;
+use crate::datasource::listing::{ListingTableConfig, ListingTableUrl};
 use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::{
@@ -519,26 +519,25 @@ impl SessionContext {
     /// Creates a DataFrame for reading an Avro data source.
     pub async fn read_avro(
         &self,
-        uri: impl Into<String>,
+        table_path: impl AsRef<str>,
         options: AvroReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
-        let uri: String = uri.into();
-        let (object_store, path) = self.runtime_env().object_store(&uri)?;
+        let table_path = ListingTableUrl::parse(table_path)?;
+        let object_store = self.runtime_env().object_store(&table_path)?;
         let target_partitions = self.copied_config().target_partitions;
 
         let listing_options = options.to_listing_options(target_partitions);
 
-        let path: String = path.into();
-
         let resolved_schema = match options.schema {
             Some(s) => s,
             None => {
                 listing_options
-                    .infer_schema(Arc::clone(&object_store), &path)
+                    .infer_schema(Arc::clone(&object_store), &table_path)
                     .await?
             }
         };
-        let config = ListingTableConfig::new(object_store, path.clone())
+
+        let config = ListingTableConfig::new(object_store, table_path)
             .with_listing_options(listing_options)
             .with_schema(resolved_schema);
         let provider = ListingTable::try_new(config)?;
@@ -548,26 +547,24 @@ impl SessionContext {
     /// Creates a DataFrame for reading an Json data source.
     pub async fn read_json(
         &mut self,
-        uri: impl Into<String>,
+        table_path: impl AsRef<str>,
         options: NdJsonReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
-        let uri: String = uri.into();
-        let (object_store, path) = self.runtime_env().object_store(&uri)?;
+        let table_path = ListingTableUrl::parse(table_path)?;
+        let object_store = self.runtime_env().object_store(&table_path)?;
         let target_partitions = self.copied_config().target_partitions;
 
         let listing_options = options.to_listing_options(target_partitions);
 
-        let path: String = path.into();
-
         let resolved_schema = match options.schema {
             Some(s) => s,
             None => {
                 listing_options
-                    .infer_schema(Arc::clone(&object_store), &path)
+                    .infer_schema(Arc::clone(&object_store), &table_path)
                     .await?
             }
         };
-        let config = ListingTableConfig::new(object_store, path)
+        let config = ListingTableConfig::new(object_store, table_path)
             .with_listing_options(listing_options)
             .with_schema(resolved_schema);
         let provider = ListingTable::try_new(config)?;
@@ -586,52 +583,47 @@ impl SessionContext {
     /// Creates a DataFrame for reading a CSV data source.
     pub async fn read_csv(
         &self,
-        uri: impl Into<String>,
+        table_path: impl AsRef<str>,
         options: CsvReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
-        let uri: String = uri.into();
-        let (object_store, path) = self.runtime_env().object_store(&uri)?;
+        let table_path = ListingTableUrl::parse(table_path)?;
+        let object_store = self.runtime_env().object_store(&table_path)?;
         let target_partitions = self.copied_config().target_partitions;
-        let path = path.to_string();
         let listing_options = options.to_listing_options(target_partitions);
         let resolved_schema = match options.schema {
             Some(s) => Arc::new(s.to_owned()),
             None => {
                 listing_options
-                    .infer_schema(Arc::clone(&object_store), &path)
+                    .infer_schema(Arc::clone(&object_store), &table_path)
                     .await?
             }
         };
-        let config = ListingTableConfig::new(object_store, path.clone())
+        let config = ListingTableConfig::new(object_store, table_path.clone())
             .with_listing_options(listing_options)
             .with_schema(resolved_schema);
-        let provider = ListingTable::try_new(config)?;
 
-        let plan =
-            LogicalPlanBuilder::scan(path, 
provider_as_source(Arc::new(provider)), None)?
-                .build()?;
-        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+        let provider = ListingTable::try_new(config)?;
+        self.read_table(Arc::new(provider))
     }
 
     /// Creates a DataFrame for reading a Parquet data source.
     pub async fn read_parquet(
         &self,
-        uri: impl Into<String>,
+        table_path: impl AsRef<str>,
         options: ParquetReadOptions<'_>,
     ) -> Result<Arc<DataFrame>> {
-        let uri: String = uri.into();
-        let (object_store, path) = self.runtime_env().object_store(&uri)?;
+        let table_path = ListingTableUrl::parse(table_path)?;
+        let object_store = self.runtime_env().object_store(&table_path)?;
         let target_partitions = self.copied_config().target_partitions;
 
         let listing_options = options.to_listing_options(target_partitions);
-        let path: String = path.into();
 
         // with parquet we resolve the schema in all cases
         let resolved_schema = listing_options
-            .infer_schema(Arc::clone(&object_store), &path)
+            .infer_schema(Arc::clone(&object_store), &table_path)
             .await?;
 
-        let config = ListingTableConfig::new(object_store, path)
+        let config = ListingTableConfig::new(object_store, table_path)
             .with_listing_options(listing_options)
             .with_schema(resolved_schema);
 
@@ -651,23 +643,24 @@ impl SessionContext {
     /// Registers a table that uses the listing feature of the object store to
     /// find the files to be processed
     /// This is async because it might need to resolve the schema.
-    pub async fn register_listing_table<'a>(
-        &'a self,
-        name: &'a str,
-        uri: &'a str,
+    pub async fn register_listing_table(
+        &self,
+        name: &str,
+        table_path: impl AsRef<str>,
         options: ListingOptions,
         provided_schema: Option<SchemaRef>,
     ) -> Result<()> {
-        let (object_store, path) = self.runtime_env().object_store(uri)?;
+        let table_path = ListingTableUrl::parse(table_path)?;
+        let object_store = self.runtime_env().object_store(&table_path)?;
         let resolved_schema = match provided_schema {
             None => {
                 options
-                    .infer_schema(Arc::clone(&object_store), path)
+                    .infer_schema(Arc::clone(&object_store), &table_path)
                     .await?
             }
             Some(s) => s,
         };
-        let config = ListingTableConfig::new(object_store, path)
+        let config = ListingTableConfig::new(object_store, table_path)
             .with_listing_options(options)
             .with_schema(resolved_schema);
         let table = ListingTable::try_new(config)?;
@@ -680,7 +673,7 @@ impl SessionContext {
     pub async fn register_csv(
         &self,
         name: &str,
-        uri: &str,
+        table_path: &str,
         options: CsvReadOptions<'_>,
     ) -> Result<()> {
         let listing_options =
@@ -688,7 +681,7 @@ impl SessionContext {
 
         self.register_listing_table(
             name,
-            uri,
+            table_path,
             listing_options,
             options.schema.map(|s| Arc::new(s.to_owned())),
         )
@@ -702,13 +695,13 @@ impl SessionContext {
     pub async fn register_json(
         &self,
         name: &str,
-        uri: &str,
+        table_path: &str,
         options: NdJsonReadOptions<'_>,
     ) -> Result<()> {
         let listing_options =
             options.to_listing_options(self.copied_config().target_partitions);
 
-        self.register_listing_table(name, uri, listing_options, options.schema)
+        self.register_listing_table(name, table_path, listing_options, 
options.schema)
             .await?;
         Ok(())
     }
@@ -718,7 +711,7 @@ impl SessionContext {
     pub async fn register_parquet(
         &self,
         name: &str,
-        uri: &str,
+        table_path: &str,
         options: ParquetReadOptions<'_>,
     ) -> Result<()> {
         let (target_partitions, parquet_pruning) = {
@@ -729,7 +722,7 @@ impl SessionContext {
             .parquet_pruning(parquet_pruning)
             .to_listing_options(target_partitions);
 
-        self.register_listing_table(name, uri, listing_options, None)
+        self.register_listing_table(name, table_path, listing_options, None)
             .await?;
         Ok(())
     }
@@ -739,13 +732,13 @@ impl SessionContext {
     pub async fn register_avro(
         &self,
         name: &str,
-        uri: &str,
+        table_path: &str,
         options: AvroReadOptions<'_>,
     ) -> Result<()> {
         let listing_options =
             options.to_listing_options(self.copied_config().target_partitions);
 
-        self.register_listing_table(name, uri, listing_options, options.schema)
+        self.register_listing_table(name, table_path, listing_options, 
options.schema)
             .await?;
         Ok(())
     }
diff --git a/datafusion/core/src/execution/runtime_env.rs 
b/datafusion/core/src/execution/runtime_env.rs
index 73bbc836e..26d1471a1 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -26,12 +26,13 @@ use crate::{
     },
 };
 
-use crate::datasource::object_store_registry::ObjectStoreRegistry;
+use crate::datasource::object_store::ObjectStoreRegistry;
 use datafusion_common::DataFusionError;
 use datafusion_data_access::object_store::ObjectStore;
 use std::fmt::{Debug, Formatter};
 use std::path::PathBuf;
 use std::sync::Arc;
+use url::Url;
 
 #[derive(Clone)]
 /// Execution runtime environment.
@@ -100,12 +101,9 @@ impl RuntimeEnv {
     }
 
     /// Retrieves a `ObjectStore` instance by scheme
-    pub fn object_store<'a>(
-        &self,
-        uri: &'a str,
-    ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
+    pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn 
ObjectStore>> {
         self.object_store_registry
-            .get_by_uri(uri)
+            .get_by_url(url)
             .map_err(DataFusionError::from)
     }
 }
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 600e24fb8..e8852fdc1 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -52,11 +52,11 @@
 //!    .to_string();
 //!
 //! let expected = vec![
-//!     "+---+--------------------------+",
-//!     "| a | MIN(tests/example.csv.b) |",
-//!     "+---+--------------------------+",
-//!     "| 1 | 2                        |",
-//!     "+---+--------------------------+"
+//!     "+---+----------------+",
+//!     "| a | MIN(?table?.b) |",
+//!     "+---+----------------+",
+//!     "| 1 | 2              |",
+//!     "+---+----------------+"
 //! ];
 //!
 //! assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index 3b2c4515e..f24e1e4e6 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -241,6 +241,7 @@ mod tests {
 
     use super::*;
     use crate::datasource::listing::PartitionedFile;
+    use crate::datasource::object_store::ObjectStoreUrl;
     use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
     use crate::physical_plan::expressions::{col, PhysicalSortExpr};
     use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
@@ -261,6 +262,7 @@ mod tests {
         Arc::new(ParquetExec::new(
             FileScanConfig {
                 object_store: TestObjectStore::new_arc(&[("x", 100)]),
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
                 file_schema: schema(),
                 file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
                 statistics: Statistics::default(),
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs 
b/datafusion/core/src/physical_plan/file_format/avro.rs
index fc56ce1d8..8f4d30be0 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -167,6 +167,7 @@ impl ExecutionPlan for AvroExec {
 mod tests {
     use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
+    use crate::datasource::object_store::ObjectStoreUrl;
     use crate::prelude::SessionContext;
     use crate::scalar::ScalarValue;
     use arrow::datatypes::{DataType, Field, Schema};
@@ -188,6 +189,7 @@ mod tests {
 
         let avro_exec = AvroExec::new(FileScanConfig {
             object_store: Arc::new(LocalFileSystem {}),
+            object_store_url: ObjectStoreUrl::local_filesystem(),
             file_groups: vec![vec![meta.into()]],
             file_schema,
             statistics: Statistics::default(),
@@ -241,9 +243,12 @@ mod tests {
     async fn avro_exec_missing_column() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
-        let store = Arc::new(LocalFileSystem {}) as _;
+        let object_store = Arc::new(LocalFileSystem {}) as _;
+        let object_store_url = ObjectStoreUrl::local_filesystem();
         let meta = local_unpartitioned_file(filename);
-        let actual_schema = AvroFormat {}.infer_schema(&store, 
&[meta.clone()]).await?;
+        let actual_schema = AvroFormat {}
+            .infer_schema(&object_store, &[meta.clone()])
+            .await?;
 
         let mut fields = actual_schema.fields().clone();
         fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -253,7 +258,8 @@ mod tests {
         let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
 
         let avro_exec = AvroExec::new(FileScanConfig {
-            object_store: store,
+            object_store,
+            object_store_url,
             file_groups: vec![vec![meta.into()]],
             file_schema,
             statistics: Statistics::default(),
@@ -307,9 +313,12 @@ mod tests {
     async fn avro_exec_with_partition() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
-        let store = Arc::new(LocalFileSystem {}) as _;
+        let object_store = Arc::new(LocalFileSystem {}) as _;
+        let object_store_url = ObjectStoreUrl::local_filesystem();
         let meta = local_unpartitioned_file(filename);
-        let file_schema = AvroFormat {}.infer_schema(&store, 
&[meta.clone()]).await?;
+        let file_schema = AvroFormat {}
+            .infer_schema(&object_store, &[meta.clone()])
+            .await?;
 
         let mut partitioned_file = PartitionedFile::from(meta);
         partitioned_file.partition_values =
@@ -319,7 +328,8 @@ mod tests {
             // select specific columns of the files as well as the partitioning
             // column which is supposed to be the last column in the table 
schema.
             projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
-            object_store: store,
+            object_store,
+            object_store_url,
             file_groups: vec![vec![partitioned_file]],
             file_schema,
             statistics: Statistics::default(),
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index 5470f6d57..3a179a7a2 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -195,6 +195,7 @@ mod tests {
     use crate::datafusion_data_access::object_store::local::LocalFileSystem;
     use crate::datasource::file_format::{json::JsonFormat, FileFormat};
     use crate::datasource::listing::PartitionedFile;
+    use crate::datasource::object_store::ObjectStoreUrl;
     use crate::prelude::NdJsonReadOptions;
     use crate::prelude::*;
     use datafusion_data_access::object_store::local::local_unpartitioned_file;
@@ -205,9 +206,14 @@ mod tests {
 
     const TEST_DATA_BASE: &str = "tests/jsons";
 
-    async fn prepare_store(
-    ) -> (Arc<dyn ObjectStore>, Vec<Vec<PartitionedFile>>, SchemaRef) {
+    async fn prepare_store() -> (
+        Arc<dyn ObjectStore>,
+        ObjectStoreUrl,
+        Vec<Vec<PartitionedFile>>,
+        SchemaRef,
+    ) {
         let store = Arc::new(LocalFileSystem {}) as _;
+        let store_url = ObjectStoreUrl::local_filesystem();
         let path = format!("{}/1.json", TEST_DATA_BASE);
         let meta = local_unpartitioned_file(path);
         let schema = JsonFormat::default()
@@ -215,7 +221,7 @@ mod tests {
             .await
             .unwrap();
 
-        (store, vec![vec![meta.into()]], schema)
+        (store, store_url, vec![vec![meta.into()]], schema)
     }
 
     #[tokio::test]
@@ -223,10 +229,13 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         use arrow::datatypes::DataType;
-        let (object_store, file_groups, file_schema) = prepare_store().await;
+
+        let (object_store, object_store_url, file_groups, file_schema) =
+            prepare_store().await;
 
         let exec = NdJsonExec::new(FileScanConfig {
             object_store,
+            object_store_url,
             file_groups,
             file_schema,
             statistics: Statistics::default(),
@@ -280,7 +289,8 @@ mod tests {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         use arrow::datatypes::DataType;
-        let (object_store, file_groups, actual_schema) = prepare_store().await;
+        let (object_store, object_store_url, file_groups, actual_schema) =
+            prepare_store().await;
 
         let mut fields = actual_schema.fields().clone();
         fields.push(Field::new("missing_col", DataType::Int32, true));
@@ -290,6 +300,7 @@ mod tests {
 
         let exec = NdJsonExec::new(FileScanConfig {
             object_store,
+            object_store_url,
             file_groups,
             file_schema,
             statistics: Statistics::default(),
@@ -319,10 +330,12 @@ mod tests {
     async fn nd_json_exec_file_projection() -> Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
-        let (object_store, file_groups, file_schema) = prepare_store().await;
+        let (object_store, object_store_url, file_groups, file_schema) =
+            prepare_store().await;
 
         let exec = NdJsonExec::new(FileScanConfig {
             object_store,
+            object_store_url,
             file_groups,
             file_schema,
             statistics: Statistics::default(),
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs 
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 566b4c8d4..0fed497d3 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -38,7 +38,7 @@ pub use csv::CsvExec;
 pub(crate) use json::plan_to_json;
 pub use json::NdJsonExec;
 
-use crate::datasource::listing::PartitionedFile;
+use crate::datasource::{listing::PartitionedFile, 
object_store::ObjectStoreUrl};
 use crate::{
     error::{DataFusionError, Result},
     scalar::ScalarValue,
@@ -68,6 +68,8 @@ lazy_static! {
 pub struct FileScanConfig {
     /// Store from which the `files` should be fetched
     pub object_store: Arc<dyn ObjectStore>,
+    /// Object store URL
+    pub object_store_url: ObjectStoreUrl,
     /// Schema before projection. It contains the columns that are expected
     /// to be in the files without the table partition columns.
     pub file_schema: SchemaRef,
@@ -658,6 +660,7 @@ mod tests {
             file_groups: vec![vec![]],
             limit: None,
             object_store: TestObjectStore::new_arc(&[]),
+            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
             projection,
             statistics,
             table_partition_cols,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 0931484ef..9eda036a4 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -640,6 +640,7 @@ mod tests {
     use crate::datasource::file_format::parquet::test_util::store_parquet;
     use crate::datasource::file_format::test_util::scan_format;
     use crate::datasource::listing::FileRange;
+    use crate::datasource::object_store::ObjectStoreUrl;
     use crate::execution::options::CsvReadOptions;
     use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
     use arrow::array::Float32Array;
@@ -681,6 +682,7 @@ mod tests {
         let parquet_exec = ParquetExec::new(
             FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
+                object_store_url: ObjectStoreUrl::local_filesystem(),
                 file_groups: vec![file_groups],
                 file_schema,
                 statistics: Statistics::default(),
@@ -1067,6 +1069,7 @@ mod tests {
             let parquet_exec = ParquetExec::new(
                 FileScanConfig {
                     object_store: Arc::new(LocalFileSystem {}),
+                    object_store_url: ObjectStoreUrl::local_filesystem(),
                     file_groups,
                     file_schema,
                     statistics: Statistics::default(),
@@ -1155,6 +1158,7 @@ mod tests {
         let parquet_exec = ParquetExec::new(
             FileScanConfig {
                 object_store: store,
+                object_store_url: ObjectStoreUrl::local_filesystem(),
                 file_groups: vec![vec![partitioned_file]],
                 file_schema: schema,
                 statistics: Statistics::default(),
@@ -1214,6 +1218,7 @@ mod tests {
         let parquet_exec = ParquetExec::new(
             FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
+                object_store_url: ObjectStoreUrl::local_filesystem(),
                 file_groups: vec![vec![partitioned_file]],
                 file_schema: Arc::new(Schema::empty()),
                 statistics: Statistics::default(),
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index feb0bf322..2a89ea0df 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -288,6 +288,7 @@ pub fn with_new_children_if_necessary(
 /// ```
 /// use datafusion::prelude::*;
 /// use datafusion::physical_plan::displayable;
+/// use std::path::is_separator;
 ///
 /// #[tokio::main]
 /// async fn main() {
@@ -310,11 +311,15 @@ pub fn with_new_children_if_necessary(
 ///   let displayable_plan = displayable(physical_plan.as_ref());
 ///   let plan_string = format!("{}", displayable_plan.indent());
 ///
+///   let working_directory = std::env::current_dir().unwrap();
+///   let normalized = 
working_directory.to_string_lossy().replace(is_separator, "/");
+///   let plan_string = plan_string.replace(&normalized, "WORKING_DIR");
+///
 ///   assert_eq!("ProjectionExec: expr=[a@0 as a]\
 ///              \n  CoalesceBatchesExec: target_batch_size=4096\
 ///              \n    FilterExec: a@0 < 5\
 ///              \n      RepartitionExec: partitioning=RoundRobinBatch(3)\
-///              \n        CsvExec: files=[tests/example.csv], 
has_header=true, limit=None, projection=[a]",
+///              \n        CsvExec: files=[WORKING_DIR/tests/example.csv], 
has_header=true, limit=None, projection=[a]",
 ///               plan_string.trim());
 ///
 ///   let one_line = format!("{}", displayable_plan.one_line());
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 815379de4..19a9db9a4 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -18,6 +18,7 @@
 //! Common unit test utility methods
 
 use crate::arrow::array::UInt32Array;
+use crate::datasource::object_store::ObjectStoreUrl;
 use crate::datasource::{MemTable, TableProvider};
 use crate::error::Result;
 use crate::from_slice::FromSlice;
@@ -120,6 +121,7 @@ pub fn partitioned_csv_config(
 
     Ok(FileScanConfig {
         object_store: Arc::new(LocalFileSystem {}),
+        object_store_url: ObjectStoreUrl::local_filesystem(),
         file_schema: schema,
         file_groups,
         statistics: Default::default(),
diff --git a/datafusion/core/src/test/object_store.rs 
b/datafusion/core/src/test/object_store.rs
index cc9c02305..95a6f16ed 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -48,7 +48,7 @@ impl TestObjectStore {
 #[async_trait]
 impl ObjectStore for TestObjectStore {
     async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
-        let prefix = prefix.to_owned();
+        let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string();
         Ok(Box::pin(
             stream::iter(
                 self.files
diff --git a/datafusion/core/tests/path_partition.rs 
b/datafusion/core/tests/path_partition.rs
index 873554747..a7e6ea452 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -20,6 +20,7 @@
 use std::{fs, io, sync::Arc};
 
 use async_trait::async_trait;
+use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::{
     assert_batches_sorted_eq,
     datafusion_data_access::{
@@ -182,7 +183,7 @@ async fn csv_filter_with_file_col() -> Result<()> {
             "mytable/date=2021-10-28/file.csv",
         ],
         &["date"],
-        "mytable",
+        "file:///mytable",
     );
 
     let result = ctx
@@ -218,7 +219,7 @@ async fn csv_projection_on_partition() -> Result<()> {
             "mytable/date=2021-10-28/file.csv",
         ],
         &["date"],
-        "mytable",
+        "file:///mytable",
     );
 
     let result = ctx
@@ -255,7 +256,7 @@ async fn csv_grouping_by_partition() -> Result<()> {
             "mytable/date=2021-10-28/file.csv",
         ],
         &["date"],
-        "mytable",
+        "file:///mytable",
     );
 
     let result = ctx
@@ -419,6 +420,7 @@ fn register_partitioned_aggregate_csv(
     let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
     options.table_partition_cols = partition_cols.iter().map(|&s| 
s.to_owned()).collect();
 
+    let table_path = ListingTableUrl::parse(table_path).unwrap();
     let config = ListingTableConfig::new(object_store, table_path)
         .with_listing_options(options)
         .with_schema(file_schema);
@@ -444,8 +446,12 @@ async fn register_partitioned_alltypes_parquet(
     options.table_partition_cols = partition_cols.iter().map(|&s| 
s.to_owned()).collect();
     options.collect_stat = true;
 
+    let table_path = ListingTableUrl::parse(format!("mirror:///{}", 
table_path)).unwrap();
+    let store_path =
+        ListingTableUrl::parse(format!("mirror:///{}", 
store_paths[0])).unwrap();
+
     let file_schema = options
-        .infer_schema(Arc::clone(&object_store), store_paths[0])
+        .infer_schema(Arc::clone(&object_store), &store_path)
         .await
         .expect("Parquet schema inference failed");
 
@@ -487,7 +493,7 @@ impl ObjectStore for MirroringObjectStore {
         &self,
         prefix: &str,
     ) -> datafusion_data_access::Result<FileMetaStream> {
-        let prefix = prefix.to_owned();
+        let prefix = prefix.strip_prefix('/').unwrap_or(prefix).to_string();
         let size = self.file_size;
         Ok(Box::pin(
             stream::iter(
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 51ddff2fb..947ebc699 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -17,6 +17,7 @@
 
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
+use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::error::Result;
 use datafusion::physical_plan::file_format::FileScanConfig;
 use datafusion::physical_plan::{collect, ExecutionPlan};
@@ -81,21 +82,23 @@ async fn get_exec(
     let meta = local_unpartitioned_file(filename);
 
     let format = ParquetFormat::default();
-    let store = Arc::new(LocalFileSystem {}) as _;
+    let object_store = Arc::new(LocalFileSystem {}) as _;
+    let object_store_url = ObjectStoreUrl::local_filesystem();
 
     let file_schema = format
-        .infer_schema(&store, &[meta.clone()])
+        .infer_schema(&object_store, &[meta.clone()])
         .await
         .expect("Schema inference");
     let statistics = format
-        .infer_stats(&store, file_schema.clone(), &meta)
+        .infer_stats(&object_store, file_schema.clone(), &meta)
         .await
         .expect("Stats inference");
     let file_groups = vec![vec![meta.into()]];
     let exec = format
         .create_physical_plan(
             FileScanConfig {
-                object_store: store,
+                object_store,
+                object_store_url,
                 file_schema,
                 file_groups,
                 statistics,
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index f72e0f8f8..759edf6fc 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -642,7 +642,7 @@ order by
 #[tokio::test]
 async fn test_physical_plan_display_indent() {
     // Hard code target_partitions as it appears in the RepartitionExec output
-    let config = SessionConfig::new().with_target_partitions(3);
+    let config = SessionConfig::new().with_target_partitions(9000);
     let ctx = SessionContext::with_config(config);
     register_aggregate_csv(&ctx).await.unwrap();
     let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \
@@ -662,22 +662,21 @@ async fn test_physical_plan_display_indent() {
         "      ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 
as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
         "        AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
         "          CoalesceBatchesExec: target_batch_size=4096",
-        "            RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 3)",
+        "            RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 9000)",
         "              AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
         "                CoalesceBatchesExec: target_batch_size=4096",
         "                  FilterExec: c12@1 < CAST(10 AS Float64)",
-        "                    RepartitionExec: partitioning=RoundRobinBatch(3)",
+        "                    RepartitionExec: 
partitioning=RoundRobinBatch(9000)",
         "                      CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
limit=None, projection=[c1, c12]",
     ];
 
-    let data_path = datafusion::test_util::arrow_test_data();
+    let normalizer = ExplainNormalizer::new();
     let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
         .trim()
         .lines()
         // normalize paths
-        .map(|s| s.replace(&data_path, "ARROW_TEST_DATA"))
+        .map(|s| normalizer.normalize(s))
         .collect::<Vec<_>>();
-
     assert_eq!(
         expected, actual,
         "expected:\n{:#?}\nactual:\n\n{:#?}\n",
@@ -688,7 +687,7 @@ async fn test_physical_plan_display_indent() {
 #[tokio::test]
 async fn test_physical_plan_display_indent_multi_children() {
     // Hard code target_partitions as it appears in the RepartitionExec output
-    let config = SessionConfig::new().with_target_partitions(3);
+    let config = SessionConfig::new().with_target_partitions(9000);
     let ctx = SessionContext::with_config(config);
     // ensure indenting works for nodes with multiple children
     register_aggregate_csv(&ctx).await.unwrap();
@@ -708,25 +707,25 @@ async fn 
test_physical_plan_display_indent_multi_children() {
         "  CoalesceBatchesExec: target_batch_size=4096",
         "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { 
name: \"c1\", index: 0 }, Column { name: \"c2\", index: 0 })]",
         "      CoalesceBatchesExec: target_batch_size=4096",
-        "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", 
index: 0 }], 3)",
+        "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", 
index: 0 }], 9000)",
         "          ProjectionExec: expr=[c1@0 as c1]",
         "            ProjectionExec: expr=[c1@0 as c1]",
-        "              RepartitionExec: partitioning=RoundRobinBatch(3)",
+        "              RepartitionExec: partitioning=RoundRobinBatch(9000)",
         "                CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
limit=None, projection=[c1]",
         "      CoalesceBatchesExec: target_batch_size=4096",
-        "        RepartitionExec: partitioning=Hash([Column { name: \"c2\", 
index: 0 }], 3)",
+        "        RepartitionExec: partitioning=Hash([Column { name: \"c2\", 
index: 0 }], 9000)",
         "          ProjectionExec: expr=[c2@0 as c2]",
         "            ProjectionExec: expr=[c1@0 as c2]",
-        "              RepartitionExec: partitioning=RoundRobinBatch(3)",
+        "              RepartitionExec: partitioning=RoundRobinBatch(9000)",
         "                CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
limit=None, projection=[c1]",
     ];
 
-    let data_path = datafusion::test_util::arrow_test_data();
+    let normalizer = ExplainNormalizer::new();
     let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
         .trim()
         .lines()
         // normalize paths
-        .map(|s| s.replace(&data_path, "ARROW_TEST_DATA"))
+        .map(|s| normalizer.normalize(s))
         .collect::<Vec<_>>();
 
     assert_eq!(
diff --git a/datafusion/core/tests/sql/json.rs 
b/datafusion/core/tests/sql/json.rs
index 79deaae79..a74076415 100644
--- a/datafusion/core/tests/sql/json.rs
+++ b/datafusion/core/tests/sql/json.rs
@@ -96,7 +96,7 @@ async fn json_explain() {
             \n    CoalescePartitionsExec\
             \n      AggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]\
             \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
-            \n          JsonExec: limit=None, files=[tests/jsons/2.json]\n",
+            \n          JsonExec: limit=None, 
files=[WORKING_DIR/tests/jsons/2.json]\n",
         ],
     ];
     assert_eq!(expected, actual);
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index a13f2ebff..3e19dbcb9 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -48,7 +48,9 @@ use datafusion::{execution::context::SessionContext, 
physical_plan::displayable}
 use datafusion_expr::Volatility;
 use std::fs::File;
 use std::io::Write;
+use std::path::PathBuf;
 use tempfile::TempDir;
+use url::Url;
 
 /// A macro to assert that some particular line contains two substrings
 ///
@@ -811,25 +813,57 @@ pub fn table_with_sequence(
     Ok(Arc::new(MemTable::try_new(schema, partitions)?))
 }
 
-// Normalizes parts of an explain plan that vary from run to run (such as path)
-fn normalize_for_explain(s: &str) -> String {
-    // Convert things like 
/Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv
-    // to ARROW_TEST_DATA/csv/aggregate_test_100.csv
-    let data_path = datafusion::test_util::arrow_test_data();
-    let s = s.replace(&data_path, "ARROW_TEST_DATA");
+pub struct ExplainNormalizer {
+    replacements: Vec<(String, String)>,
+}
+
+impl ExplainNormalizer {
+    fn new() -> Self {
+        let mut replacements = vec![];
+
+        let mut push_path = |path: PathBuf, key: &str| {
+            // Push path as is
+            replacements.push((path.to_string_lossy().to_string(), 
key.to_string()));
+
+            // Push canonical version of path
+            let canonical = path.canonicalize().unwrap();
+            replacements.push((canonical.to_string_lossy().to_string(), 
key.to_string()));
+
+            if cfg!(target_family = "windows") {
+                // Push URL representation of path, to handle windows
+                let url = Url::from_file_path(canonical).unwrap();
+                let path = url.path().strip_prefix('/').unwrap();
+                replacements.push((path.to_string(), key.to_string()));
+            }
+        };
+
+        push_path(test_util::arrow_test_data().into(), "ARROW_TEST_DATA");
+        push_path(std::env::current_dir().unwrap(), "WORKING_DIR");
 
-    // convert things like partitioning=RoundRobinBatch(16)
-    // to partitioning=RoundRobinBatch(NUM_CORES)
-    let needle = format!("RoundRobinBatch({})", num_cpus::get());
-    s.replace(&needle, "RoundRobinBatch(NUM_CORES)")
+        // convert things like partitioning=RoundRobinBatch(16)
+        // to partitioning=RoundRobinBatch(NUM_CORES)
+        let needle = format!("RoundRobinBatch({})", num_cpus::get());
+        replacements.push((needle, "RoundRobinBatch(NUM_CORES)".to_string()));
+
+        Self { replacements }
+    }
+
+    fn normalize(&self, s: impl Into<String>) -> String {
+        let mut s = s.into();
+        for (from, to) in &self.replacements {
+            s = s.replace(from, to);
+        }
+        s
+    }
 }
 
 /// Applies normalize_for_explain to every line
 fn normalize_vec_for_explain(v: Vec<Vec<String>>) -> Vec<Vec<String>> {
+    let normalizer = ExplainNormalizer::new();
     v.into_iter()
         .map(|l| {
             l.into_iter()
-                .map(|s| normalize_for_explain(&s))
+                .map(|s| normalizer.normalize(s))
                 .collect::<Vec<_>>()
         })
         .collect::<Vec<_>>()
@@ -948,7 +982,7 @@ async fn nyc() -> Result<()> {
     let ctx = SessionContext::new();
     ctx.register_csv(
         "tripdata",
-        "file.csv",
+        "file:///file.csv",
         CsvReadOptions::new().schema(&schema),
     )
     .await?;
diff --git a/datafusion/data-access/Cargo.toml 
b/datafusion/data-access/Cargo.toml
index 7bf447f04..8a556f1f3 100644
--- a/datafusion/data-access/Cargo.toml
+++ b/datafusion/data-access/Cargo.toml
@@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion";
 readme = "README.md"
 authors = ["Apache Arrow <[email protected]>"]
 license = "Apache-2.0"
-keywords = [ "arrow", "query", "sql" ]
+keywords = ["arrow", "query", "sql"]
 edition = "2021"
 rust-version = "1.59"
 
@@ -36,7 +36,6 @@ path = "src/lib.rs"
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false, features = ["std"] }
 futures = "0.3"
-glob = "0.3.0"
 parking_lot = "0.12"
 tempfile = "3"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync", "fs", "parking_lot"] }
diff --git a/datafusion/data-access/src/object_store/local.rs 
b/datafusion/data-access/src/object_store/local.rs
index 604539814..7b0cab7cd 100644
--- a/datafusion/data-access/src/object_store/local.rs
+++ b/datafusion/data-access/src/object_store/local.rs
@@ -317,30 +317,4 @@ mod tests {
 
         Ok(())
     }
-
-    #[tokio::test]
-    async fn test_globbing() -> Result<()> {
-        let tmp = tempdir()?;
-        let a1_path = tmp.path().join("a1.txt");
-        let a2_path = tmp.path().join("a2.txt");
-        let b1_path = tmp.path().join("b1.txt");
-        File::create(&a1_path)?;
-        File::create(&a2_path)?;
-        File::create(&b1_path)?;
-
-        let glob = format!("{}/a*.txt", tmp.path().to_str().unwrap());
-        let mut all_files = HashSet::new();
-        let mut files = LocalFileSystem.glob_file(&glob).await?;
-        while let Some(file) = files.next().await {
-            let file = file?;
-            assert_eq!(file.size(), 0);
-            all_files.insert(file.path().to_owned());
-        }
-
-        assert_eq!(all_files.len(), 2);
-        assert!(all_files.contains(a1_path.to_str().unwrap()));
-        assert!(all_files.contains(a2_path.to_str().unwrap()));
-
-        Ok(())
-    }
 }
diff --git a/datafusion/data-access/src/object_store/mod.rs 
b/datafusion/data-access/src/object_store/mod.rs
index 93a930a6d..496a5494f 100644
--- a/datafusion/data-access/src/object_store/mod.rs
+++ b/datafusion/data-access/src/object_store/mod.rs
@@ -21,15 +21,11 @@ pub mod local;
 
 use std::fmt::Debug;
 use std::io::Read;
-use std::path;
-use std::path::{Component, Path, PathBuf};
 use std::pin::Pin;
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use futures::future::ready;
-use futures::{AsyncRead, Stream, StreamExt, TryStreamExt};
-use glob::Pattern;
+use futures::{AsyncRead, Stream};
 
 use crate::{FileMeta, ListEntry, Result, SizedFile};
 
@@ -78,54 +74,6 @@ pub trait ObjectStore: Sync + Send + Debug {
     /// Returns all the files in path `prefix`
     async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>;
 
-    /// Calls `list_file` with a suffix filter
-    async fn list_file_with_suffix(
-        &self,
-        prefix: &str,
-        suffix: &str,
-    ) -> Result<FileMetaStream> {
-        self.glob_file_with_suffix(prefix, suffix).await
-    }
-
-    /// Returns all the files matching `glob_pattern`
-    async fn glob_file(&self, glob_pattern: &str) -> Result<FileMetaStream> {
-        if !contains_glob_start_char(glob_pattern) {
-            self.list_file(glob_pattern).await
-        } else {
-            let normalized_glob_pb = normalize_path(Path::new(glob_pattern));
-            let normalized_glob_pattern =
-                normalized_glob_pb.as_os_str().to_str().unwrap();
-            let start_path =
-                
find_longest_search_path_without_glob_pattern(normalized_glob_pattern);
-            let file_stream = self.list_file(&start_path).await?;
-            let pattern = Pattern::new(normalized_glob_pattern).unwrap();
-            Ok(Box::pin(file_stream.filter(move |fr| {
-                let matches_pattern = match fr {
-                    Ok(f) => pattern.matches(f.path()),
-                    Err(_) => true,
-                };
-                async move { matches_pattern }
-            })))
-        }
-    }
-
-    /// Calls `glob_file` with a suffix filter
-    async fn glob_file_with_suffix(
-        &self,
-        glob_pattern: &str,
-        suffix: &str,
-    ) -> Result<FileMetaStream> {
-        let files_to_consider = match contains_glob_start_char(glob_pattern) {
-            true => self.glob_file(glob_pattern).await,
-            false => self.list_file(glob_pattern).await,
-        }?;
-
-        match suffix.is_empty() {
-            true => Ok(files_to_consider),
-            false => filter_suffix(files_to_consider, suffix),
-        }
-    }
-
     /// Returns all the files in `prefix` if the `prefix` is already a leaf 
dir,
     /// or all paths between the `prefix` and the first occurrence of the 
`delimiter` if it is provided.
     async fn list_dir(
@@ -137,144 +85,3 @@ pub trait ObjectStore: Sync + Send + Debug {
     /// Get object reader for one file
     fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
 }
-
-/// Normalize a path without requiring it to exist on the filesystem 
(path::canonicalize)
-pub fn normalize_path<P: AsRef<Path>>(path: P) -> PathBuf {
-    let ends_with_slash = path
-        .as_ref()
-        .to_str()
-        .map_or(false, |s| s.ends_with(path::MAIN_SEPARATOR));
-    let mut normalized = PathBuf::new();
-    for component in path.as_ref().components() {
-        match &component {
-            Component::ParentDir => {
-                if !normalized.pop() {
-                    normalized.push(component);
-                }
-            }
-            _ => {
-                normalized.push(component);
-            }
-        }
-    }
-    if ends_with_slash {
-        normalized.push("");
-    }
-    normalized
-}
-
-const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
-
-/// Determine whether the path contains a globbing character
-fn contains_glob_start_char(path: &str) -> bool {
-    path.chars().any(|c| GLOB_START_CHARS.contains(&c))
-}
-
-/// Filters the file_stream to only contain files that end with suffix
-fn filter_suffix(file_stream: FileMetaStream, suffix: &str) -> 
Result<FileMetaStream> {
-    let suffix = suffix.to_owned();
-    Ok(Box::pin(
-        file_stream.try_filter(move |f| ready(f.path().ends_with(&suffix))),
-    ))
-}
-
-fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String 
{
-    // in case the glob_pattern is not actually a glob pattern, take the 
entire thing
-    if !contains_glob_start_char(glob_pattern) {
-        glob_pattern.to_string()
-    } else {
-        // take all the components of the path (left-to-right) which do not 
contain a glob pattern
-        let components_in_glob_pattern = Path::new(glob_pattern).components();
-        let mut path_buf_for_longest_search_path_without_glob_pattern = 
PathBuf::new();
-        for component_in_glob_pattern in components_in_glob_pattern {
-            let component_as_str =
-                component_in_glob_pattern.as_os_str().to_str().unwrap();
-            if contains_glob_start_char(component_as_str) {
-                break;
-            }
-            path_buf_for_longest_search_path_without_glob_pattern
-                .push(component_in_glob_pattern);
-        }
-
-        let mut result = path_buf_for_longest_search_path_without_glob_pattern
-            .to_str()
-            .unwrap()
-            .to_string();
-
-        // when we're not at the root, append a separator
-        if path_buf_for_longest_search_path_without_glob_pattern
-            .components()
-            .count()
-            > 1
-        {
-            result.push(path::MAIN_SEPARATOR);
-        }
-        result
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[tokio::test]
-    async fn test_is_glob_path() -> Result<()> {
-        assert!(!contains_glob_start_char("/"));
-        assert!(!contains_glob_start_char("/test"));
-        assert!(!contains_glob_start_char("/test/"));
-        assert!(contains_glob_start_char("/test*"));
-        Ok(())
-    }
-
-    fn test_longest_base_path(input: &str, expected: &str) {
-        assert_eq!(
-            find_longest_search_path_without_glob_pattern(input),
-            expected,
-            "testing find_longest_search_path_without_glob_pattern with {}",
-            input
-        );
-    }
-
-    #[tokio::test]
-    async fn test_find_longest_search_path_without_glob_pattern() -> 
Result<()> {
-        // no glob patterns, thus we get the full path (as-is)
-        test_longest_base_path("/", "/");
-        test_longest_base_path("/a.txt", "/a.txt");
-        test_longest_base_path("/a", "/a");
-        test_longest_base_path("/a/", "/a/");
-        test_longest_base_path("/a/b", "/a/b");
-        test_longest_base_path("/a/b/", "/a/b/");
-        test_longest_base_path("/a/b.txt", "/a/b.txt");
-        test_longest_base_path("/a/b/c.txt", "/a/b/c.txt");
-        // glob patterns, thus we build the longest path (os-specific)
-        use path::MAIN_SEPARATOR;
-        test_longest_base_path("/*.txt", &format!("{MAIN_SEPARATOR}"));
-        test_longest_base_path(
-            "/a/*b.txt",
-            &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
-        );
-        test_longest_base_path(
-            "/a/*/b.txt",
-            &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
-        );
-        test_longest_base_path(
-            "/a/b/[123]/file*.txt",
-            &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"),
-        );
-        test_longest_base_path(
-            "/a/b*.txt",
-            &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
-        );
-        test_longest_base_path(
-            "/a/b/**/c*.txt",
-            &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"),
-        );
-        test_longest_base_path(
-            &format!("{}/alltypes_plain*.parquet", "/a/b/c//"), // 
https://github.com/apache/arrow-datafusion/issues/2465
-            &format!(
-                
"{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}c{MAIN_SEPARATOR}"
-            ),
-        );
-        Ok(())
-    }
-}
diff --git a/datafusion/proto/src/logical_plan.rs 
b/datafusion/proto/src/logical_plan.rs
index bf557be88..4eff00e01 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -30,7 +30,7 @@ use datafusion::{
         file_format::{
             avro::AvroFormat, csv::CsvFormat, parquet::ParquetFormat, 
FileFormat,
         },
-        listing::{ListingOptions, ListingTable, ListingTableConfig},
+        listing::{ListingOptions, ListingTable, ListingTableConfig, 
ListingTableUrl},
     },
     logical_plan::{provider_as_source, source_as_provider},
 };
@@ -411,6 +411,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         FileFormatType::Avro(..) => 
Arc::new(AvroFormat::default()),
                     };
 
+                let table_path = ListingTableUrl::parse(&scan.path)?;
                 let options = ListingOptions {
                     file_extension: scan.file_extension.clone(),
                     format: file_format,
@@ -419,7 +420,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     target_partitions: scan.target_partitions as usize,
                 };
 
-                let object_store = 
ctx.runtime_env().object_store(scan.path.as_str())?.0;
+                let object_store = 
ctx.runtime_env().object_store(&table_path)?;
 
                 println!(
                     "Found object store {:?} for path {}",
@@ -427,7 +428,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     scan.path.as_str()
                 );
 
-                let config = ListingTableConfig::new(object_store, 
scan.path.as_str())
+                let config = ListingTableConfig::new(object_store, table_path)
                     .with_listing_options(options)
                     .with_schema(Arc::new(schema));
 
@@ -763,7 +764,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                                     .options()
                                     .table_partition_cols
                                     .clone(),
-                                path: listing_table.table_path().to_owned(),
+                                path: listing_table.table_path().to_string(),
                                 schema: Some(schema),
                                 projection,
                                 filters,
diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh
index 0c3922627..8c2e687f3 100755
--- a/dev/build-arrow-ballista.sh
+++ b/dev/build-arrow-ballista.sh
@@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null
 
 # clone the repo
 # TODO make repo/branch configurable
-git clone https://github.com/apache/arrow-ballista
+git clone https://github.com/tustvold/arrow-ballista -b url-refactor
 
 # update dependencies to local crates
 python ./dev/make-ballista-deps-local.py

Reply via email to