This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5e0b2d0554 fix(datafusion-proto): support serializing/deserilizing
ArrowFormat tables (#16875)
5e0b2d0554 is described below
commit 5e0b2d0554af3a38bb93ccd305a410c5df7905f2
Author: Colin Marc <[email protected]>
AuthorDate: Sat Jul 26 08:56:33 2025 +0200
fix(datafusion-proto): support serializing/deserilizing ArrowFormat tables
(#16875)
Fixes #16874
---
.../proto-common/proto/datafusion_common.proto | 2 +
datafusion/proto-common/src/generated/pbjson.rs | 71 +++++++++++++++++++++
datafusion/proto-common/src/generated/prost.rs | 2 +
datafusion/proto/proto/datafusion.proto | 1 +
.../proto/src/generated/datafusion_proto_common.rs | 2 +
datafusion/proto/src/generated/pbjson.rs | 13 ++++
datafusion/proto/src/generated/prost.rs | 7 +-
datafusion/proto/src/lib.rs | 5 +-
datafusion/proto/src/logical_plan/mod.rs | 17 +++--
.../proto/tests/cases/roundtrip_logical_plan.rs | 14 ++++
datafusion/proto/tests/testdata/test.arrow | Bin 0 -> 1842 bytes
11 files changed, 127 insertions(+), 7 deletions(-)
diff --git a/datafusion/proto-common/proto/datafusion_common.proto
b/datafusion/proto-common/proto/datafusion_common.proto
index 81fc9cceb7..8cb2726058 100644
--- a/datafusion/proto-common/proto/datafusion_common.proto
+++ b/datafusion/proto-common/proto/datafusion_common.proto
@@ -55,6 +55,8 @@ message NdJsonFormat {
JsonOptions options = 1;
}
+message ArrowFormat {}
+
message PrimaryKeyConstraint{
repeated uint64 indices = 1;
diff --git a/datafusion/proto-common/src/generated/pbjson.rs
b/datafusion/proto-common/src/generated/pbjson.rs
index c3b6686df0..f35fd15946 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -1,3 +1,74 @@
+impl serde::Serialize for ArrowFormat {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let len = 0;
+ let struct_ser =
serializer.serialize_struct("datafusion_common.ArrowFormat", len)?;
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for ArrowFormat {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ Err(serde::de::Error::unknown_field(value, FIELDS))
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = ArrowFormat;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion_common.ArrowFormat")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<ArrowFormat, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ while map_.next_key::<GeneratedField>()?.is_some() {
+ let _ = map_.next_value::<serde::de::IgnoredAny>()?;
+ }
+ Ok(ArrowFormat {
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion_common.ArrowFormat",
FIELDS, GeneratedVisitor)
+ }
+}
impl serde::Serialize for ArrowOptions {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
diff --git a/datafusion/proto-common/src/generated/prost.rs
b/datafusion/proto-common/src/generated/prost.rs
index 411d72af4c..ac4a9ea4be 100644
--- a/datafusion/proto-common/src/generated/prost.rs
+++ b/datafusion/proto-common/src/generated/prost.rs
@@ -45,6 +45,8 @@ pub struct NdJsonFormat {
#[prost(message, optional, tag = "1")]
pub options: ::core::option::Option<JsonOptions>,
}
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
+pub struct ArrowFormat {}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PrimaryKeyConstraint {
#[prost(uint64, repeated, tag = "1")]
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 85a565c0b2..666a8c7d1f 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -98,6 +98,7 @@ message ListingTableScanNode {
datafusion_common.ParquetFormat parquet = 11;
datafusion_common.AvroFormat avro = 12;
datafusion_common.NdJsonFormat json = 15;
+ datafusion_common.ArrowFormat arrow = 16;
}
repeated SortExprNodeCollection file_sort_order = 13;
}
diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs
b/datafusion/proto/src/generated/datafusion_proto_common.rs
index 411d72af4c..ac4a9ea4be 100644
--- a/datafusion/proto/src/generated/datafusion_proto_common.rs
+++ b/datafusion/proto/src/generated/datafusion_proto_common.rs
@@ -45,6 +45,8 @@ pub struct NdJsonFormat {
#[prost(message, optional, tag = "1")]
pub options: ::core::option::Option<JsonOptions>,
}
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
+pub struct ArrowFormat {}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PrimaryKeyConstraint {
#[prost(uint64, repeated, tag = "1")]
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 83f66ec22c..c2e6d8ef59 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -9907,6 +9907,9 @@ impl serde::Serialize for ListingTableScanNode {
listing_table_scan_node::FileFormatType::Json(v) => {
struct_ser.serialize_field("json", v)?;
}
+ listing_table_scan_node::FileFormatType::Arrow(v) => {
+ struct_ser.serialize_field("arrow", v)?;
+ }
}
}
struct_ser.end()
@@ -9939,6 +9942,7 @@ impl<'de> serde::Deserialize<'de> for
ListingTableScanNode {
"parquet",
"avro",
"json",
+ "arrow",
];
#[allow(clippy::enum_variant_names)]
@@ -9957,6 +9961,7 @@ impl<'de> serde::Deserialize<'de> for
ListingTableScanNode {
Parquet,
Avro,
Json,
+ Arrow,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -9992,6 +9997,7 @@ impl<'de> serde::Deserialize<'de> for
ListingTableScanNode {
"parquet" => Ok(GeneratedField::Parquet),
"avro" => Ok(GeneratedField::Avro),
"json" => Ok(GeneratedField::Json),
+ "arrow" => Ok(GeneratedField::Arrow),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -10112,6 +10118,13 @@ impl<'de> serde::Deserialize<'de> for
ListingTableScanNode {
return
Err(serde::de::Error::duplicate_field("json"));
}
file_format_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(listing_table_scan_node::FileFormatType::Json)
+;
+ }
+ GeneratedField::Arrow => {
+ if file_format_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("arrow"));
+ }
+ file_format_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(listing_table_scan_node::FileFormatType::Arrow)
;
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index cbf6b3b2d4..35491366dc 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -123,7 +123,10 @@ pub struct ListingTableScanNode {
pub target_partitions: u32,
#[prost(message, repeated, tag = "13")]
pub file_sort_order: ::prost::alloc::vec::Vec<SortExprNodeCollection>,
- #[prost(oneof = "listing_table_scan_node::FileFormatType", tags = "10, 11,
12, 15")]
+ #[prost(
+ oneof = "listing_table_scan_node::FileFormatType",
+ tags = "10, 11, 12, 15, 16"
+ )]
pub file_format_type: ::core::option::Option<
listing_table_scan_node::FileFormatType,
>,
@@ -140,6 +143,8 @@ pub mod listing_table_scan_node {
Avro(super::super::datafusion_common::AvroFormat),
#[prost(message, tag = "15")]
Json(super::super::datafusion_common::NdJsonFormat),
+ #[prost(message, tag = "16")]
+ Arrow(super::super::datafusion_common::ArrowFormat),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 2df162f21e..b4d72aa1b6 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -130,8 +130,9 @@ pub mod protobuf {
pub use crate::generated::datafusion::*;
pub use datafusion_proto_common::common::proto_error;
pub use datafusion_proto_common::protobuf_common::{
- ArrowOptions, ArrowType, AvroFormat, AvroOptions, CsvFormat, DfSchema,
- EmptyMessage, Field, JoinSide, NdJsonFormat, ParquetFormat,
ScalarValue, Schema,
+ ArrowFormat, ArrowOptions, ArrowType, AvroFormat, AvroOptions,
CsvFormat,
+ DfSchema, EmptyMessage, Field, JoinSide, NdJsonFormat, ParquetFormat,
+ ScalarValue, Schema,
};
pub use datafusion_proto_common::{FromProtoError, ToProtoError};
}
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 9915d3617f..576a51707c 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -35,6 +35,7 @@ use crate::{
use crate::protobuf::{proto_error, ToProtoError};
use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef};
use datafusion::datasource::cte_worktable::CteWorkTable;
+use datafusion::datasource::file_format::arrow::ArrowFormat;
#[cfg(feature = "avro")]
use datafusion::datasource::file_format::avro::AvroFormat;
#[cfg(feature = "parquet")]
@@ -439,13 +440,16 @@ impl AsLogicalPlan for LogicalPlanNode {
}
#[cfg_attr(not(feature = "avro"),
allow(unused_variables))]
FileFormatType::Avro(..) => {
- #[cfg(feature = "avro")]
+ #[cfg(feature = "avro")]
{
Arc::new(AvroFormat)
}
#[cfg(not(feature = "avro"))]
panic!("Unable to process avro file since `avro`
feature is not enabled");
}
+ FileFormatType::Arrow(..) => {
+ Arc::new(ArrowFormat)
+ }
};
let table_paths = &scan
@@ -1057,13 +1061,18 @@ impl AsLogicalPlan for LogicalPlanNode {
Some(FileFormatType::Avro(protobuf::AvroFormat
{}))
}
+ if any.is::<ArrowFormat>() {
+ maybe_some_type =
+
Some(FileFormatType::Arrow(protobuf::ArrowFormat {}))
+ }
+
if let Some(file_format_type) = maybe_some_type {
file_format_type
} else {
return Err(proto_error(format!(
- "Error converting file format, {:?} is invalid as
a datafusion format.",
- listing_table.options().format
- )));
+ "Error deserializing unknown file format:
{:?}",
+ listing_table.options().format
+ )));
}
};
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 6c51d553fe..170c2675f7 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -28,6 +28,7 @@ use datafusion::datasource::file_format::json::{JsonFormat,
JsonFormatFactory};
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
+use datafusion::execution::options::ArrowReadOptions;
use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
use datafusion::optimizer::Optimizer;
use datafusion_common::parsers::CompressionTypeVariant;
@@ -2656,3 +2657,16 @@ async fn roundtrip_custom_listing_tables_schema() ->
Result<()> {
assert_eq!(plan, new_plan);
Ok(())
}
+
+#[tokio::test]
+async fn roundtrip_arrow_scan() -> Result<()> {
+ let ctx = SessionContext::new();
+ let plan = ctx
+ .read_arrow("tests/testdata/test.arrow", ArrowReadOptions::default())
+ .await?
+ .into_optimized_plan()?;
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+ assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+ Ok(())
+}
diff --git a/datafusion/proto/tests/testdata/test.arrow
b/datafusion/proto/tests/testdata/test.arrow
new file mode 100644
index 0000000000..5314d9eea1
Binary files /dev/null and b/datafusion/proto/tests/testdata/test.arrow differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]