This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3bd720033e expose table name in proto extension codec (#11139)
3bd720033e is described below
commit 3bd720033e1f1602a637eaa3a367de48a5843b4f
Author: Leonardo Yvens <[email protected]>
AuthorDate: Fri Jun 28 15:17:51 2024 +0100
expose table name in proto extension codec (#11139)
---
datafusion/proto/src/logical_plan/file_formats.rs | 12 +++++++++++-
datafusion/proto/src/logical_plan/mod.rs | 14 ++++++++++----
datafusion/proto/tests/cases/roundtrip_logical_plan.rs | 13 ++++++++++++-
3 files changed, 33 insertions(+), 6 deletions(-)
diff --git a/datafusion/proto/src/logical_plan/file_formats.rs
b/datafusion/proto/src/logical_plan/file_formats.rs
index 31102b728e..106d563948 100644
--- a/datafusion/proto/src/logical_plan/file_formats.rs
+++ b/datafusion/proto/src/logical_plan/file_formats.rs
@@ -24,7 +24,7 @@ use datafusion::{
},
prelude::SessionContext,
};
-use datafusion_common::not_impl_err;
+use datafusion_common::{not_impl_err, TableReference};
use super::LogicalExtensionCodec;
@@ -53,6 +53,7 @@ impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
fn try_decode_table_provider(
&self,
_buf: &[u8],
+ _table_ref: &TableReference,
_schema: arrow::datatypes::SchemaRef,
_ctx: &datafusion::prelude::SessionContext,
) -> datafusion_common::Result<
@@ -63,6 +64,7 @@ impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
fn try_encode_table_provider(
&self,
+ _table_ref: &TableReference,
_node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
_buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
@@ -127,6 +129,7 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
fn try_decode_table_provider(
&self,
_buf: &[u8],
+ _table_ref: &TableReference,
_schema: arrow::datatypes::SchemaRef,
_ctx: &datafusion::prelude::SessionContext,
) -> datafusion_common::Result<
@@ -137,6 +140,7 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
fn try_encode_table_provider(
&self,
+ _table_ref: &TableReference,
_node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
_buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
@@ -201,6 +205,7 @@ impl LogicalExtensionCodec for ParquetLogicalExtensionCodec
{
fn try_decode_table_provider(
&self,
_buf: &[u8],
+ _table_ref: &TableReference,
_schema: arrow::datatypes::SchemaRef,
_ctx: &datafusion::prelude::SessionContext,
) -> datafusion_common::Result<
@@ -211,6 +216,7 @@ impl LogicalExtensionCodec for ParquetLogicalExtensionCodec
{
fn try_encode_table_provider(
&self,
+ _table_ref: &TableReference,
_node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
_buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
@@ -275,6 +281,7 @@ impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
fn try_decode_table_provider(
&self,
_buf: &[u8],
+ _table_ref: &TableReference,
_schema: arrow::datatypes::SchemaRef,
_ctx: &datafusion::prelude::SessionContext,
) -> datafusion_common::Result<
@@ -285,6 +292,7 @@ impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
fn try_encode_table_provider(
&self,
+ _table_ref: &TableReference,
_node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
_buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
@@ -349,6 +357,7 @@ impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
fn try_decode_table_provider(
&self,
_buf: &[u8],
+ _table_ref: &TableReference,
_schema: arrow::datatypes::SchemaRef,
_cts: &datafusion::prelude::SessionContext,
) -> datafusion_common::Result<
@@ -359,6 +368,7 @@ impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
fn try_encode_table_provider(
&self,
+ _table_ref: &TableReference,
_node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
_buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index cdb9d5260a..664cd7e115 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -109,12 +109,14 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
fn try_decode_table_provider(
&self,
buf: &[u8],
+ table_ref: &TableReference,
schema: SchemaRef,
ctx: &SessionContext,
) -> Result<Arc<dyn TableProvider>>;
fn try_encode_table_provider(
&self,
+ table_ref: &TableReference,
node: Arc<dyn TableProvider>,
buf: &mut Vec<u8>,
) -> Result<()>;
@@ -164,6 +166,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec
{
fn try_decode_table_provider(
&self,
_buf: &[u8],
+ _table_ref: &TableReference,
_schema: SchemaRef,
_ctx: &SessionContext,
) -> Result<Arc<dyn TableProvider>> {
@@ -172,6 +175,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec
{
fn try_encode_table_provider(
&self,
+ _table_ref: &TableReference,
_node: Arc<dyn TableProvider>,
_buf: &mut Vec<u8>,
) -> Result<()> {
@@ -445,15 +449,17 @@ impl AsLogicalPlan for LogicalPlanNode {
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx,
extension_codec))
.collect::<Result<Vec<_>, _>>()?;
+
+ let table_name =
+ from_table_reference(scan.table_name.as_ref(),
"CustomScan")?;
+
let provider = extension_codec.try_decode_table_provider(
&scan.custom_table_data,
+ &table_name,
schema,
ctx,
)?;
- let table_name =
- from_table_reference(scan.table_name.as_ref(),
"CustomScan")?;
-
LogicalPlanBuilder::scan_with_filters(
table_name,
provider_as_source(provider),
@@ -1048,7 +1054,7 @@ impl AsLogicalPlan for LogicalPlanNode {
} else {
let mut bytes = vec![];
extension_codec
- .try_encode_table_provider(provider, &mut bytes)
+ .try_encode_table_provider(table_name, provider, &mut
bytes)
.map_err(|e| context!("Error serializing custom
table", e))?;
let scan = CustomScan(CustomTableScanNode {
table_name: Some(table_name.clone().into()),
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index d54078b72b..fe3da3d058 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -52,7 +52,7 @@ use datafusion_common::config::TableOptions;
use datafusion_common::scalar::ScalarStructBuilder;
use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
DFSchemaRef,
- DataFusionError, Result, ScalarValue,
+ DataFusionError, Result, ScalarValue, TableReference,
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
@@ -134,6 +134,9 @@ pub struct TestTableProto {
/// URL of the table root
#[prost(string, tag = "1")]
pub url: String,
+ /// Qualified table name
+ #[prost(string, tag = "2")]
+ pub table_name: String,
}
#[derive(Debug)]
@@ -156,12 +159,14 @@ impl LogicalExtensionCodec for TestTableProviderCodec {
fn try_decode_table_provider(
&self,
buf: &[u8],
+ table_ref: &TableReference,
schema: SchemaRef,
_ctx: &SessionContext,
) -> Result<Arc<dyn TableProvider>> {
let msg = TestTableProto::decode(buf).map_err(|_| {
DataFusionError::Internal("Error decoding test table".to_string())
})?;
+ assert_eq!(msg.table_name, table_ref.to_string());
let provider = TestTableProvider {
url: msg.url,
schema,
@@ -171,6 +176,7 @@ impl LogicalExtensionCodec for TestTableProviderCodec {
fn try_encode_table_provider(
&self,
+ table_ref: &TableReference,
node: Arc<dyn TableProvider>,
buf: &mut Vec<u8>,
) -> Result<()> {
@@ -181,6 +187,7 @@ impl LogicalExtensionCodec for TestTableProviderCodec {
.expect("Can't encode non-test tables");
let msg = TestTableProto {
url: table.url.clone(),
+ table_name: table_ref.to_string(),
};
msg.encode(buf).map_err(|_| {
DataFusionError::Internal("Error encoding test table".to_string())
@@ -866,6 +873,7 @@ impl LogicalExtensionCodec for TopKExtensionCodec {
fn try_decode_table_provider(
&self,
_buf: &[u8],
+ _table_ref: &TableReference,
_schema: SchemaRef,
_ctx: &SessionContext,
) -> Result<Arc<dyn TableProvider>> {
@@ -874,6 +882,7 @@ impl LogicalExtensionCodec for TopKExtensionCodec {
fn try_encode_table_provider(
&self,
+ _table_ref: &TableReference,
_node: Arc<dyn TableProvider>,
_buf: &mut Vec<u8>,
) -> Result<()> {
@@ -943,6 +952,7 @@ impl LogicalExtensionCodec for ScalarUDFExtensionCodec {
fn try_decode_table_provider(
&self,
_buf: &[u8],
+ _table_ref: &TableReference,
_schema: SchemaRef,
_ctx: &SessionContext,
) -> Result<Arc<dyn TableProvider>> {
@@ -951,6 +961,7 @@ impl LogicalExtensionCodec for ScalarUDFExtensionCodec {
fn try_encode_table_provider(
&self,
+ _table_ref: &TableReference,
_node: Arc<dyn TableProvider>,
_buf: &mut Vec<u8>,
) -> Result<()> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]