This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 2d94b74 fix: handle schema retrieval for datafusion api (#187)
2d94b74 is described below
commit 2d94b74936f62ea32feb0ef0204098d41fd55bf2
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Nov 19 21:46:48 2024 -1000
fix: handle schema retrieval for datafusion api (#187)
---
crates/datafusion/src/lib.rs | 21 ++++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 739cc90..cd82220 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -23,7 +23,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::thread;
-use arrow_schema::SchemaRef;
+use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::datasource::listing::PartitionedFile;
@@ -86,7 +86,8 @@ impl TableProvider for HudiDataSource {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { table.get_schema().await })
});
- SchemaRef::from(handle.join().unwrap().unwrap())
+ let result = handle.join().unwrap().unwrap_or_else(|_|
Schema::empty());
+ SchemaRef::from(result)
}
fn table_type(&self) -> TableType {
@@ -212,7 +213,7 @@ mod tests {
use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_tests::TestTable::{
- V6ComplexkeygenHivestyle, V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
+ V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
V6TimebasedkeygenNonhivestyle,
};
@@ -231,6 +232,16 @@ mod tests {
assert_eq!(hudi.get_input_partitions(), 0)
}
+ #[tokio::test]
+ async fn test_get_empty_schema_from_empty_table() {
+ let table_provider =
+ HudiDataSource::new_with_options(V6Empty.path().as_str(),
empty_options())
+ .await
+ .unwrap();
+ let schema = table_provider.schema();
+ assert!(schema.fields().is_empty());
+ }
+
async fn register_test_table_with_session<I, K, V>(
test_table: &TestTable,
options: I,
@@ -345,7 +356,7 @@ mod tests {
}
#[tokio::test]
- async fn datafusion_read_hudi_table() {
+ async fn test_datafusion_read_hudi_table() {
for (test_table, use_sql, planned_input_partitions) in &[
(V6ComplexkeygenHivestyle, true, 2),
(V6Nonpartitioned, true, 1),
@@ -386,7 +397,7 @@ mod tests {
}
#[tokio::test]
- async fn datafusion_read_hudi_table_with_replacecommits() {
+ async fn test_datafusion_read_hudi_table_with_replacecommits() {
for (test_table, use_sql, planned_input_partitions) in
&[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)]
{