This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bd720033e expose table name in proto extension codec (#11139)
3bd720033e is described below

commit 3bd720033e1f1602a637eaa3a367de48a5843b4f
Author: Leonardo Yvens <[email protected]>
AuthorDate: Fri Jun 28 15:17:51 2024 +0100

    expose table name in proto extension codec (#11139)
---
 datafusion/proto/src/logical_plan/file_formats.rs      | 12 +++++++++++-
 datafusion/proto/src/logical_plan/mod.rs               | 14 ++++++++++----
 datafusion/proto/tests/cases/roundtrip_logical_plan.rs | 13 ++++++++++++-
 3 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/datafusion/proto/src/logical_plan/file_formats.rs 
b/datafusion/proto/src/logical_plan/file_formats.rs
index 31102b728e..106d563948 100644
--- a/datafusion/proto/src/logical_plan/file_formats.rs
+++ b/datafusion/proto/src/logical_plan/file_formats.rs
@@ -24,7 +24,7 @@ use datafusion::{
     },
     prelude::SessionContext,
 };
-use datafusion_common::not_impl_err;
+use datafusion_common::{not_impl_err, TableReference};
 
 use super::LogicalExtensionCodec;
 
@@ -53,6 +53,7 @@ impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
     fn try_decode_table_provider(
         &self,
         _buf: &[u8],
+        _table_ref: &TableReference,
         _schema: arrow::datatypes::SchemaRef,
         _ctx: &datafusion::prelude::SessionContext,
     ) -> datafusion_common::Result<
@@ -63,6 +64,7 @@ impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
 
     fn try_encode_table_provider(
         &self,
+        _table_ref: &TableReference,
         _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
         _buf: &mut Vec<u8>,
     ) -> datafusion_common::Result<()> {
@@ -127,6 +129,7 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
     fn try_decode_table_provider(
         &self,
         _buf: &[u8],
+        _table_ref: &TableReference,
         _schema: arrow::datatypes::SchemaRef,
         _ctx: &datafusion::prelude::SessionContext,
     ) -> datafusion_common::Result<
@@ -137,6 +140,7 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
 
     fn try_encode_table_provider(
         &self,
+        _table_ref: &TableReference,
         _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
         _buf: &mut Vec<u8>,
     ) -> datafusion_common::Result<()> {
@@ -201,6 +205,7 @@ impl LogicalExtensionCodec for ParquetLogicalExtensionCodec 
{
     fn try_decode_table_provider(
         &self,
         _buf: &[u8],
+        _table_ref: &TableReference,
         _schema: arrow::datatypes::SchemaRef,
         _ctx: &datafusion::prelude::SessionContext,
     ) -> datafusion_common::Result<
@@ -211,6 +216,7 @@ impl LogicalExtensionCodec for ParquetLogicalExtensionCodec 
{
 
     fn try_encode_table_provider(
         &self,
+        _table_ref: &TableReference,
         _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
         _buf: &mut Vec<u8>,
     ) -> datafusion_common::Result<()> {
@@ -275,6 +281,7 @@ impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
     fn try_decode_table_provider(
         &self,
         _buf: &[u8],
+        _table_ref: &TableReference,
         _schema: arrow::datatypes::SchemaRef,
         _ctx: &datafusion::prelude::SessionContext,
     ) -> datafusion_common::Result<
@@ -285,6 +292,7 @@ impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
 
     fn try_encode_table_provider(
         &self,
+        _table_ref: &TableReference,
         _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
         _buf: &mut Vec<u8>,
     ) -> datafusion_common::Result<()> {
@@ -349,6 +357,7 @@ impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
     fn try_decode_table_provider(
         &self,
         _buf: &[u8],
+        _table_ref: &TableReference,
         _schema: arrow::datatypes::SchemaRef,
         _cts: &datafusion::prelude::SessionContext,
     ) -> datafusion_common::Result<
@@ -359,6 +368,7 @@ impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
 
     fn try_encode_table_provider(
         &self,
+        _table_ref: &TableReference,
         _node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
         _buf: &mut Vec<u8>,
     ) -> datafusion_common::Result<()> {
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index cdb9d5260a..664cd7e115 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -109,12 +109,14 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
     fn try_decode_table_provider(
         &self,
         buf: &[u8],
+        table_ref: &TableReference,
         schema: SchemaRef,
         ctx: &SessionContext,
     ) -> Result<Arc<dyn TableProvider>>;
 
     fn try_encode_table_provider(
         &self,
+        table_ref: &TableReference,
         node: Arc<dyn TableProvider>,
         buf: &mut Vec<u8>,
     ) -> Result<()>;
@@ -164,6 +166,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec 
{
     fn try_decode_table_provider(
         &self,
         _buf: &[u8],
+        _table_ref: &TableReference,
         _schema: SchemaRef,
         _ctx: &SessionContext,
     ) -> Result<Arc<dyn TableProvider>> {
@@ -172,6 +175,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec 
{
 
     fn try_encode_table_provider(
         &self,
+        _table_ref: &TableReference,
         _node: Arc<dyn TableProvider>,
         _buf: &mut Vec<u8>,
     ) -> Result<()> {
@@ -445,15 +449,17 @@ impl AsLogicalPlan for LogicalPlanNode {
                     .iter()
                     .map(|expr| from_proto::parse_expr(expr, ctx, 
extension_codec))
                     .collect::<Result<Vec<_>, _>>()?;
+
+                let table_name =
+                    from_table_reference(scan.table_name.as_ref(), 
"CustomScan")?;
+
                 let provider = extension_codec.try_decode_table_provider(
                     &scan.custom_table_data,
+                    &table_name,
                     schema,
                     ctx,
                 )?;
 
-                let table_name =
-                    from_table_reference(scan.table_name.as_ref(), 
"CustomScan")?;
-
                 LogicalPlanBuilder::scan_with_filters(
                     table_name,
                     provider_as_source(provider),
@@ -1048,7 +1054,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                 } else {
                     let mut bytes = vec![];
                     extension_codec
-                        .try_encode_table_provider(provider, &mut bytes)
+                        .try_encode_table_provider(table_name, provider, &mut 
bytes)
                         .map_err(|e| context!("Error serializing custom 
table", e))?;
                     let scan = CustomScan(CustomTableScanNode {
                         table_name: Some(table_name.clone().into()),
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index d54078b72b..fe3da3d058 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -52,7 +52,7 @@ use datafusion_common::config::TableOptions;
 use datafusion_common::scalar::ScalarStructBuilder;
 use datafusion_common::{
     internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, 
DFSchemaRef,
-    DataFusionError, Result, ScalarValue,
+    DataFusionError, Result, ScalarValue, TableReference,
 };
 use datafusion_expr::dml::CopyTo;
 use datafusion_expr::expr::{
@@ -134,6 +134,9 @@ pub struct TestTableProto {
     /// URL of the table root
     #[prost(string, tag = "1")]
     pub url: String,
+    /// Qualified table name
+    #[prost(string, tag = "2")]
+    pub table_name: String,
 }
 
 #[derive(Debug)]
@@ -156,12 +159,14 @@ impl LogicalExtensionCodec for TestTableProviderCodec {
     fn try_decode_table_provider(
         &self,
         buf: &[u8],
+        table_ref: &TableReference,
         schema: SchemaRef,
         _ctx: &SessionContext,
     ) -> Result<Arc<dyn TableProvider>> {
         let msg = TestTableProto::decode(buf).map_err(|_| {
             DataFusionError::Internal("Error decoding test table".to_string())
         })?;
+        assert_eq!(msg.table_name, table_ref.to_string());
         let provider = TestTableProvider {
             url: msg.url,
             schema,
@@ -171,6 +176,7 @@ impl LogicalExtensionCodec for TestTableProviderCodec {
 
     fn try_encode_table_provider(
         &self,
+        table_ref: &TableReference,
         node: Arc<dyn TableProvider>,
         buf: &mut Vec<u8>,
     ) -> Result<()> {
@@ -181,6 +187,7 @@ impl LogicalExtensionCodec for TestTableProviderCodec {
             .expect("Can't encode non-test tables");
         let msg = TestTableProto {
             url: table.url.clone(),
+            table_name: table_ref.to_string(),
         };
         msg.encode(buf).map_err(|_| {
             DataFusionError::Internal("Error encoding test table".to_string())
@@ -866,6 +873,7 @@ impl LogicalExtensionCodec for TopKExtensionCodec {
     fn try_decode_table_provider(
         &self,
         _buf: &[u8],
+        _table_ref: &TableReference,
         _schema: SchemaRef,
         _ctx: &SessionContext,
     ) -> Result<Arc<dyn TableProvider>> {
@@ -874,6 +882,7 @@ impl LogicalExtensionCodec for TopKExtensionCodec {
 
     fn try_encode_table_provider(
         &self,
+        _table_ref: &TableReference,
         _node: Arc<dyn TableProvider>,
         _buf: &mut Vec<u8>,
     ) -> Result<()> {
@@ -943,6 +952,7 @@ impl LogicalExtensionCodec for ScalarUDFExtensionCodec {
     fn try_decode_table_provider(
         &self,
         _buf: &[u8],
+        _table_ref: &TableReference,
         _schema: SchemaRef,
         _ctx: &SessionContext,
     ) -> Result<Arc<dyn TableProvider>> {
@@ -951,6 +961,7 @@ impl LogicalExtensionCodec for ScalarUDFExtensionCodec {
 
     fn try_encode_table_provider(
         &self,
+        _table_ref: &TableReference,
         _node: Arc<dyn TableProvider>,
         _buf: &mut Vec<u8>,
     ) -> Result<()> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to