This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 501acff Allow for multiple input files per table instead of a single
file (#519)
501acff is described below
commit 501acfff1b82cda7e4892a57deb3ab2cb83ea2b4
Author: Jeremy Dyer <[email protected]>
AuthorDate: Sat Oct 21 08:47:04 2023 -0400
Allow for multiple input files per table instead of a single file (#519)
---
datafusion/input/location.py | 10 +++++-----
datafusion/tests/test_input.py | 2 +-
src/common/schema.rs | 16 ++++++++--------
3 files changed, 14 insertions(+), 14 deletions(-)
diff --git a/datafusion/input/location.py b/datafusion/input/location.py
index efbc82f..939c7f4 100644
--- a/datafusion/input/location.py
+++ b/datafusion/input/location.py
@@ -16,6 +16,7 @@
# under the License.
import os
+import glob
from typing import Any
from datafusion.common import DataTypeMap, SqlTable
@@ -41,14 +42,12 @@ class LocationInputPlugin(BaseInputSource):
format = extension.lstrip(".").lower()
num_rows = 0 # Total number of rows in the file. Used for statistics
columns = []
-
if format == "parquet":
import pyarrow.parquet as pq
# Read the Parquet metadata
metadata = pq.read_metadata(input_file)
num_rows = metadata.num_rows
-
# Iterate through the schema and build the SqlTable
for col in metadata.schema:
columns.append(
@@ -57,7 +56,6 @@ class LocationInputPlugin(BaseInputSource):
DataTypeMap.from_parquet_type_str(col.physical_type),
)
)
-
elif format == "csv":
import csv
@@ -73,7 +71,6 @@ class LocationInputPlugin(BaseInputSource):
print(header_row)
for _ in reader:
num_rows += 1
-
# TODO: Need to actually consume this row into resonable columns
raise RuntimeError(
"TODO: Currently unable to support CSV input files."
@@ -84,4 +81,7 @@ class LocationInputPlugin(BaseInputSource):
Only Parquet and CSV."
)
- return SqlTable(table_name, columns, num_rows, input_file)
+ # Input could possibly be multiple files. Create a list if so
+ input_files = glob.glob(input_file)
+
+ return SqlTable(table_name, columns, num_rows, input_files)
diff --git a/datafusion/tests/test_input.py b/datafusion/tests/test_input.py
index 1e2ef41..5b1decf 100644
--- a/datafusion/tests/test_input.py
+++ b/datafusion/tests/test_input.py
@@ -30,4 +30,4 @@ def test_location_input():
tbl = location_input.build_table(input_file, table_name)
assert "blog" == tbl.name
assert 3 == len(tbl.columns)
- assert "blogs.parquet" in tbl.filepath
+ assert "blogs.parquet" in tbl.filepaths[0]
diff --git a/src/common/schema.rs b/src/common/schema.rs
index a003d0c..77b0ce2 100644
--- a/src/common/schema.rs
+++ b/src/common/schema.rs
@@ -56,7 +56,7 @@ pub struct SqlTable {
#[pyo3(get, set)]
pub statistics: SqlStatistics,
#[pyo3(get, set)]
- pub filepath: Option<String>,
+ pub filepaths: Option<Vec<String>>,
}
#[pymethods]
@@ -66,7 +66,7 @@ impl SqlTable {
table_name: String,
columns: Vec<(String, DataTypeMap)>,
row_count: f64,
- filepath: Option<String>,
+ filepaths: Option<Vec<String>>,
) -> Self {
Self {
name: table_name,
@@ -76,7 +76,7 @@ impl SqlTable {
indexes: Vec::new(),
constraints: Vec::new(),
statistics: SqlStatistics::new(row_count),
- filepath,
+ filepaths,
}
}
}
@@ -124,7 +124,7 @@ impl SqlSchema {
pub struct SqlTableSource {
schema: SchemaRef,
statistics: Option<SqlStatistics>,
- filepath: Option<String>,
+ filepaths: Option<Vec<String>>,
}
impl SqlTableSource {
@@ -132,12 +132,12 @@ impl SqlTableSource {
pub fn new(
schema: SchemaRef,
statistics: Option<SqlStatistics>,
- filepath: Option<String>,
+ filepaths: Option<Vec<String>>,
) -> Self {
Self {
schema,
statistics,
- filepath,
+ filepaths,
}
}
@@ -148,8 +148,8 @@ impl SqlTableSource {
/// Access optional filepath associated with this table source
#[allow(dead_code)]
- pub fn filepath(&self) -> Option<&String> {
- self.filepath.as_ref()
+ pub fn filepaths(&self) -> Option<&Vec<String>> {
+ self.filepaths.as_ref()
}
}