Re: [PR] POC: Parse to Merge Logical Plan [datafusion]
github-actions[bot] closed pull request #15862: POC: Parse to Merge Logical Plan URL: https://github.com/apache/datafusion/pull/15862 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] POC: Parse to Merge Logical Plan [datafusion]
github-actions[bot] commented on PR #15862: URL: https://github.com/apache/datafusion/pull/15862#issuecomment-3034196404 Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] POC: Parse to Merge Logical Plan [datafusion]
JanKaul commented on code in PR #15862:
URL: https://github.com/apache/datafusion/pull/15862#discussion_r2072332676
##
datafusion/expr/src/logical_plan/dml.rs:
##
@@ -241,3 +241,10 @@ fn make_count_schema() -> DFSchemaRef {
.unwrap(),
)
}
+
Review Comment:
Actually the join makes sense. You'll probably always need to perform it.
I think one issue we're going to run into is that you will need to somehow
fork the join node. You will need a stream of record batches for the matching
flags and then you will need to reuse the join node to get a stream of record
batches for the not matching flags.
This is currently not well supported with datafusions execution model. When
you currently use the results of a node you can't reuse them anywhere else.
The case expression is a bit unclear to me. But maybe I just have to try to
understand it better. I thought it would be easier to keep the match clauses
separate. So that the later implementation can handle them more easily.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] POC: Parse to Merge Logical Plan [datafusion]
jonathanc-n commented on code in PR #15862:
URL: https://github.com/apache/datafusion/pull/15862#discussion_r2071905742
##
datafusion/expr/src/logical_plan/dml.rs:
##
@@ -241,3 +241,10 @@ fn make_count_schema() -> DFSchemaRef {
.unwrap(),
)
}
+
Review Comment:
Yes, I didn't fully complete the pull request yet. Currently, the
implementation is converting the table factors into a scan that are then
combined into a join; the extra flag columns are used to perform matching
later on when branching for the INSERTs, UPDATEs, and DELETEs. Do you think
just passing along a Scan instead of fully converting into a join would be
better?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] POC: Parse to Merge Logical Plan [datafusion]
JanKaul commented on code in PR #15862:
URL: https://github.com/apache/datafusion/pull/15862#discussion_r2071316933
##
datafusion/expr/src/logical_plan/dml.rs:
##
@@ -241,3 +241,10 @@ fn make_count_schema() -> DFSchemaRef {
.unwrap(),
)
}
+
Review Comment:
Am I missing something or are not using this struct in you code?
Generally I would argue that this struct should be more general, essentially
a conversion from the AST representation into Datafusion concepts. Without
performing the actual MERGE logic. This would mean converting the
`TableFactor`s into `LogicalPlan::Scan` and the `MergeClauses` into a
Datafusion structs containing the expressions.
When keeping the struct more general, the actual MERGE logic can then be
performed when planning the physical plan. Since Datafusion natively doesn't
support UPDATEs and DELETEs, this leaves more room for extensions to provide
this functionality.
But that's just my point of view.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] POC: Parse to Merge Logical Plan [datafusion]
jonathanc-n commented on PR #15862: URL: https://github.com/apache/datafusion/pull/15862#issuecomment-2831915545 PTAL @jayzhan211 @universalmind303 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] POC: Parse to Merge Logical Plan [datafusion]
jonathanc-n commented on code in PR #15862:
URL: https://github.com/apache/datafusion/pull/15862#discussion_r2061054598
##
datafusion/sql/src/statement.rs:
##
@@ -2074,6 +2073,178 @@ impl SqlToRel<'_, S> {
Ok(plan)
}
+fn merge_to_plan(
+&self,
+source_table: TableFactor,
+target_table: TableFactor,
+on: Box,
+clauses: Vec,
+) -> Result {
+let mut ctx = PlannerContext::new();
+
+let target_name: ObjectName;
+let source_name: ObjectName;
+
+match target_table {
+TableFactor::Table {
+name,
+alias,
+args,
+with_hints,
+version,
+with_ordinality,
+partitions,
+json_path,
+sample,
+index_hints,
+} => {
+target_name = name;
+}
+_ => return plan_err!("Target table can only be a table for
MERGE."),
+}
+
+let target_ref = self.object_name_to_table_reference(target_name)?;
+let target_src =
self.context_provider.get_table_source(target_ref.clone())?;
+let target_scan =
+LogicalPlanBuilder::scan(target_ref.clone(),
Arc::clone(&target_src), None)?
+.project(projected_columns,
lit(true).alias("target_exists")])? // add flag for matching target
Review Comment:
This is an earlier commit, i just put the a projected column here as a
placeholder, I will update the pr.
##
datafusion/sql/src/statement.rs:
##
@@ -2074,6 +2073,178 @@ impl SqlToRel<'_, S> {
Ok(plan)
}
+fn merge_to_plan(
+&self,
+source_table: TableFactor,
+target_table: TableFactor,
+on: Box,
+clauses: Vec,
+) -> Result {
+let mut ctx = PlannerContext::new();
+
+let target_name: ObjectName;
+let source_name: ObjectName;
+
+match target_table {
+TableFactor::Table {
+name,
+alias,
+args,
+with_hints,
+version,
+with_ordinality,
+partitions,
+json_path,
+sample,
+index_hints,
+} => {
+target_name = name;
+}
+_ => return plan_err!("Target table can only be a table for
MERGE."),
+}
+
+let target_ref = self.object_name_to_table_reference(target_name)?;
+let target_src =
self.context_provider.get_table_source(target_ref.clone())?;
+let target_scan =
+LogicalPlanBuilder::scan(target_ref.clone(),
Arc::clone(&target_src), None)?
+.project(projected_columns,
lit(true).alias("target_exists")])? // add flag for matching target
+.build()?;
+
+match source_table {
+TableFactor::Table {
+name,
+alias,
+args,
+with_hints,
+version,
+with_ordinality,
+partitions,
+json_path,
+sample,
+index_hints,
+} => {
+source_name = name;
+}
+_ => {
+return plan_err!("Source table can currently only be a table
for MERGE.")
+}
+}
+
+let source_ref = self.object_name_to_table_reference(source_name)?;
+let source_src =
self.context_provider.get_table_source(source_ref.clone())?;
+let source_scan =
+LogicalPlanBuilder::scan(source_ref.clone(),
Arc::clone(&source_src), None)?
+.project(projected_columns,
lit(true).alias("source_exists")])? // add flag for matching source
+.build()?;
+
+let target_schema = target_scan.schema();
+
+let joined_schema = DFSchema::from(
+target_scan.schema().join(source_scan.schema())?
+);
+
+let on_df_expr = self.sql_to_expr(
+*on,
+&joined_schema,
+&mut ctx,
+)?;
+
+let join_plan = LogicalPlan::Join(PlanJoin {
+left: Arc::new(target_scan.clone()),
+right: Arc::new(source_scan.clone()),
+on: vec![],
+filter: Some(on_df_expr),
+join_type: JoinType::Full,
+join_constraint: JoinConstraint::On,
+schema: Arc::new(target_scan.schema().join(source_scan.schema())?),
+null_equals_null: false,
+});
+
+// Flag checks for both tables
+let both_not_null = col("target_exists").is_not_null()
+.and(col("source_exists").is_not_null());
+let only_source = col("target_exists").is_null()
+.and(col("source_exists").is_not_null());
+let only_target = col("target_exists").is_not_null()
+
