[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 17: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 17 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Thu, 05 Sep 2024 01:01:04 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 17: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10929/ DRY_RUN=false -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 17 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 04 Sep 2024 20:01:16 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 17: Code-Review+2 -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 17 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 04 Sep 2024 20:01:15 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 16: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/16874/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 16 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 04 Sep 2024 17:05:05 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 15: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/16866/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 15 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 04 Sep 2024 14:25:03 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Peter Rozsa has uploaded a new patch set (#15). ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables MERGE statement is a DML command that allows users to perform conditional insert, update, or delete operations on a target table based on the results of a join with a source table. This change adds MERGE statement parsing and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino by supporting arbitrary number of WHEN clauses, with conditions or without and accepting inline views as source. Example: 'MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED AND t.id < 100 THEN UPDATE SET column1 = s.column1 WHEN MATCHED AND t.id > 100 THEN DELETE WHEN MATCHED THEN UPDATE SET column1 = "value" WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.column1);' The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning phase, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - Authorization tests - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/MultiDataSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/java/org/apache/impala/util/IcebergUtil.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A f
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 14: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/16853/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 14 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 03 Sep 2024 13:43:47 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Peter Rozsa has uploaded a new patch set (#14). ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables MERGE statement is a DML command that allows users to perform conditional insert, update, or delete operations on a target table based on the results of a join with a source table. This change adds MERGE statement parsing and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino by supporting arbitrary number of WHEN clauses, with conditions or without and accepting inline views as source. Example: 'MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED AND t.id < 100 THEN UPDATE SET column1 = s.column1 WHEN MATCHED AND t.id > 100 THEN DELETE WHEN MATCHED THEN UPDATE SET column1 = "value" WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.column1);' The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning phase, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - Authorization tests - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/MultiDataSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/java/org/apache/impala/util/IcebergUtil.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A f
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 13: Code-Review+1 (9 comments) Left a few comments, but I think we are close to the finish line :) http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h File be/src/exec/iceberg-merge-node.h: http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@110 PS11, Line 110: ergeCase*> matched_cases_; > It would require the null checks to be moved into EvaluateCases, as the res I meant we could just save 'Tuple* previous_row_target_tuple' instead of 'last_row_'. And IsDuplicateRow() could still have 'TupleRow* actual_row' as its parameter. Duplicate check could be a bit faster and simpler, but feel free to ignore this comment if you think it doesn't make too much sense. http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@134 PS11, Line 134: > The other way to initialize non-trivial object is a separate line like 'con Ack http://gerrit.cloudera.org:8080/#/c/21423/13/be/src/exec/iceberg-merge-node.cc File be/src/exec/iceberg-merge-node.cc: http://gerrit.cloudera.org:8080/#/c/21423/13/be/src/exec/iceberg-merge-node.cc@251 PS13, Line 251: int target_tuple_idx Is 'target_tuple_idx' needed? It's always target_tuple_idx_. http://gerrit.cloudera.org:8080/#/c/21423/13/be/src/exec/iceberg-merge-node.cc@251 PS13, Line 251: TupleRow* previous_row Is 'previous_row' needed? It's always last_row_. http://gerrit.cloudera.org:8080/#/c/21423/13/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java File fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java: http://gerrit.cloudera.org:8080/#/c/21423/13/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@134 PS13, Line 134: } nit: missing empty line after this http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@258 PS11, Line 258: > Unfortunately, it's required, if it's a simple slash (/), then in terminate I see, since this SQL statement generates a parse error anyway, you could just write "* /", or use /// instead of /** .. */ for the comments. http://gerrit.cloudera.org:8080/#/c/21423/13/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: http://gerrit.cloudera.org:8080/#/c/21423/13/fe/src/main/java/org/apache/impala/planner/Planner.java@142 PS13, Line 142: { nit: missing space http://gerrit.cloudera.org:8080/#/c/21423/13/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-long.test File testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-long.test: http://gerrit.cloudera.org:8080/#/c/21423/13/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-long.test@535 PS13, Line 535: row_regex:.*[0-9]+ Would it be possible to check actual results? I see currently you are using random, but maybe we could switch to dummy data. http://gerrit.cloudera.org:8080/#/c/21423/13/tests/stress/test_merge_stress.py File tests/stress/test_merge_stress.py: http://gerrit.cloudera.org:8080/#/c/21423/13/tests/stress/test_merge_stress.py@100 PS13, Line 100: new_total new_total seems unnecessary -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 13 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Fri, 16 Aug 2024 10:31:48 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 13: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/16637/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 13 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 07 Aug 2024 15:24:56 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Hello Daniel Becker, Gabor Kaszab, Zoltan Borok-Nagy, Noemi Pap-Takacs, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/21423 to look at the new patch set (#13). Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables MERGE statement is a DML command that allows users to perform conditional insert, update, or delete operations on a target table based on the results of a join with a source table. This change adds MERGE statement parsing and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino by supporting arbitrary number of WHEN clauses, with conditions or without and accepting inline views as source. Example: 'MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED AND t.id < 100 THEN UPDATE SET column1 = s.column1 WHEN MATCHED AND t.id > 100 THEN DELETE WHEN MATCHED THEN UPDATE SET column1 = "value" WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.column1);' The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning phase, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - Authorization tests - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/MultiDataSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/java/org/apache/impala/util/IcebergUtil.java M fe/src/main/jflex/sql-scanner.flex M fe/s
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 12: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/16636/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 12 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 07 Aug 2024 13:50:15 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Peter Rozsa has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 12: (39 comments) Thank you Noemi and Zoltan! http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h File be/src/exec/iceberg-merge-node.h: http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@22 PS11, Line 22: #include "exec/exec-node.h" > I don't think we need this #include here. Done http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@70 PS11, Line 70: /// the output row should be updated, deleted or inserted. : TTupleId merge_action_tuple_id_ = -1; : > nit: Comment could fit two lines only. Done http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@110 PS11, Line 110: ergeCase*> matched_cases_; > Maybe it's safer to and simpler to use Tuple* only that points to the targe It would require the null checks to be moved into EvaluateCases, as the result set may contain a tuple row where the target table's tuple is null (not matched case). http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@134 PS11, Line 134: > nit: Is 'inline' keyword needed? The other way to initialize non-trivial object is a separate line like 'const ColumnType IcebergMergeNode::merge_action_tuple_type_ = ColumnType(TYPE_TINYINT);' With this form, it's more compact, but it also generates an init guard for cxx_global_var_init. http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.cc File be/src/exec/iceberg-merge-node.cc: http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.cc@167 PS11, Line 167: *e > nit: +4 indent needed Done http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.cc@180 PS11, Line 180: auto row = iter.Get(); : > nit: could be moved after duplicate check Done http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-sink.cc File be/src/exec/iceberg-merge-sink.cc: http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-sink.cc@108 PS11, Line 108: AddRow(dele > DispatchRow() just adds the row to the row batch, so maybe simply 'AddRow() Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java File fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java@312 PS11, Line 312: > nit: missing space Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java File fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@122 PS11, Line 122: tBase.checkTypeCompatibility( : tableRef.g > nit: no need for line break Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@61 PS11, Line 61: row, > row/record? Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@122 PS11, Line 122: ergeActionTuple(analyzer); > Hard to read because of the negations. MergeStmt could have a 'hasMacthedCa Renamed. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@245 PS11, Line 245: return new IcebergBufferedDeleteSink(icebergPositionalDeleteTable_, : > nit: missing newline Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@258 PS11, Line 258: ulat > nit: / Unfortunately, it's required, if it's a simple slash (/), then in terminates the comment block. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@261 PS11, Line 261:* SELECT /* +straight_join */ :* CAST(TupleIsNull(0) + TupleIsNull(1) * > These are only needed if there's a DELETE sink. It's not a problem to alway Added sub-task: IMPALA-13205 http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@66 PS11, Line 66: deleteTableId_ = analyzer.getDescTbl().addTargetTable(icePosDelTable_); : IcebergUtil.valid
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Peter Rozsa has uploaded a new patch set (#12). ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables MERGE statement is a DML command that allows users to perform conditional insert, update, or delete operations on a target table based on the results of a join with a source table. This change adds MERGE statement parsing and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino by supporting arbitrary number of WHEN clauses, with conditions or without and accepting inline views as source. Example: 'MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED AND t.id < 100 THEN UPDATE SET column1 = s.column1 WHEN MATCHED AND t.id > 100 THEN DELETE WHEN MATCHED THEN UPDATE SET column1 = "value" WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.column1);' The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning phase, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - Authorization tests - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/MultiDataSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/java/org/apache/impala/util/IcebergUtil.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A fe/src/test/java/org/apache/impala/analysis/MergeInsertTest.java
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Noemi Pap-Takacs has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 11: (2 comments) Just a quick glance. Very nice work! http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@66 PS11, Line 66: String updateMode = originalTargetTable_.getIcebergApiTable().properties().get( : TableProperties.UPDATE_MODE); : if (updateMode != null && !updateMode.equals("merge-on-read")) { : throw new AnalysisException(String.format("Unsupported update mode: '%s' for " + : "Iceberg table: %s", updateMode, originalTargetTable_.getFullName())); Similarly to this check, the analyze() function of IcebergMergeImpl.java statement should also check the MERGE_MODE. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/service/Frontend.java File fe/src/main/java/org/apache/impala/service/Frontend.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/service/Frontend.java@2794 PS11, Line 2794: DELETE, UPDATE You could mention MERGE in the comment. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 11 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Thu, 11 Jul 2024 14:53:34 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 11: (20 comments) A concurrent test and some larger scale would be nice. Otherwise the change looks great! http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeCase.java File fe/src/main/java/org/apache/impala/analysis/MergeCase.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@89 PS11, Line 89: protected SlotRef createSlotRef(Analyzer analyzer, String colName) : throws AnalysisException { : List path = Path.createRawPath(targetTableRef_.getUniqueAlias(), colName); : SlotRef ref = new SlotRef(path); : ref.analyze(analyzer); : return ref; : } We also have such a method in IcebergModifyImpl. Can we put it to some common place? http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@115 PS11, Line 115: return (MergeCase) super.clone(); StatementBase.clone() throws NotImplementedException. I think it'd be cleaner to throw it here as well or provide a proper implementation that copies the members. We usually implement this with the help of a copy ctor, see e.g. InsertStmt. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java File fe/src/main/java/org/apache/impala/analysis/MergeDelete.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java@49 PS11, Line 49: for (Column col : targetTableColumns_) { : resultExprs_.add(createSlotRef(analyzer, col.getName())); : } Could you please add a comment why we need all the target table columns, and not only the positition delete columns? http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java File fe/src/main/java/org/apache/impala/analysis/MergeInsert.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@125 PS11, Line 125: // Comparing by reference : boolean emptyColumnPermutation = columnPermutation_ == Collections.EMPTY_LIST; Why do you compare by reference? columnPermutation_.isEmpty() feels safer and not too inefficient either. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@188 PS11, Line 188: columnNameFrequency It doesn't store the frequency as it is not a map. So how about just 'columnNames'? http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@188 PS11, Line 188: // name Unnecessary comment: // name http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java File fe/src/main/java/org/apache/impala/analysis/MergeStmt.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@34 PS11, Line 34: / nit: you could add a space before this http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@36 PS11, Line 36: additional filter expression nit: "an additional filter expression" / "additional filter expressions" http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@69 PS11, Line 69: nit: 4 spaces are enough I think. That will be also consistent with L83 http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@80 PS11, Line 80: setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers()); Does it work? (I know it doesn't work for UPDATE e.g.) If it works, we should have tests for this. If it doesn't, let's just create Jira to track this. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@101 PS11, Line 101: super.collectTableRefs(tblRefs); nit: I think it's good practice to call super methods first, this way we can guarantee they are being invoked (e.g. imagine you add a return statement later to the method body) Probably the only exceptions to this rule are the close()/free() and such methods where we need to free the derived class's resources first. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java File fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java@98 PS11, Line 98: private SlotRef disambiguateLhs(SlotRef lhs) throws AnalysisException { Please add comment to thi
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 11: (22 comments) Took a look at the first half, I'll continue tomorrow. The code looks great, very readable! http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h File be/src/exec/iceberg-merge-node.h: http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@22 PS11, Line 22: #include "exec-node.inline.h" I don't think we need this #include here. http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@70 PS11, Line 70: /// The identifier of the merge action tuple that contains the : /// information whether the output row should be updated, : /// deleted or inserted nit: Comment could fit two lines only. http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@110 PS11, Line 110: TupleRow* previous_row, TupleRow* actual_row Maybe it's safer to and simpler to use Tuple* only that points to the target tuple. http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@134 PS11, Line 134: inline nit: Is 'inline' keyword needed? http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc File be/src/exec/iceberg-merge-node.cc: http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@186 PS6, Line 186: } : last_row_ = row; > It's a tricky situation; it's easy to print the target table's tuple, but i I think the best is option 2: ERROR: Duplicate row found: one target table row matched more than one source row. Target row: (4 4 4 hdfs://localhost:20500/test-warehouse/target_part/data/514806418e9554cc-654b33170005_831375517_data.0.parq 4), Source row: (4 4 4) (4 4 4) (4 4 4) (50) Option 1 is also acceptable. Option 3 is harder to interpret for the users, that's why I prefer option 2. http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.cc File be/src/exec/iceberg-merge-node.cc: http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.cc@167 PS11, Line 167: nit: +4 indent needed http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.cc@180 PS11, Line 180: auto row_present = row_present_evaluator_->GetTinyIntVal(row); : IcebergMergeCase* selected_case = nullptr; nit: could be moved after duplicate check http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-sink.cc File be/src/exec/iceberg-merge-sink.cc: http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-sink.cc@108 PS11, Line 108: DispatchRow DispatchRow() just adds the row to the row batch, so maybe simply 'AddRow()', or 'AddRowToBatch()' would be a better name. http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/cup/sql-parser.cup File fe/src/main/cup/sql-parser.cup: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/cup/sql-parser.cup@1020 PS6, Line 1020: table_ref:source > Table refs can be inline views so queries like `merge into target using (se I see, cool! http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java File fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java@312 PS11, Line 312: { nit: missing space http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java File fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@122 PS11, Line 122: : nit: no need for line break http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@61 PS11, Line 61: line row/record? http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@122 PS11, Line 122: !mergeStmt_.hasNotMatchedCasesOnly() Hard to read because of the negations. MergeStmt could have a 'hasMacthedCase()' method. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@245 PS11, Line 245: : nit: missing newline http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@258 PS11, Line 258: / nit: / http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@26
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 11: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/16442/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 11 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Thu, 27 Jun 2024 09:27:00 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Hello Daniel Becker, Gabor Kaszab, Zoltan Borok-Nagy, Noemi Pap-Takacs, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/21423 to look at the new patch set (#11). Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables MERGE statement is a DML command that allows users to perform conditional insert, update, or delete operations on a target table based on the results of a join with a source table. This change adds MERGE statement parsing and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino by supporting arbitrary number of WHEN clauses, with conditions or without and accepting inline views as source. Example: 'MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED AND t.id < 100 THEN UPDATE SET column1 = s.column1 WHEN MATCHED AND t.id > 100 THEN DELETE WHEN MATCHED THEN UPDATE SET column1 = "value" WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.column1);' The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning phase, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - Authorization tests - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/MultiDataSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A fe/src/test/java/org/apache/impala/analysis/MergeInsertTes
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 10: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/16441/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 10 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Thu, 27 Jun 2024 08:41:01 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 9: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/16440/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 9 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Thu, 27 Jun 2024 08:38:28 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 8: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/16439/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 8 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Thu, 27 Jun 2024 08:36:00 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Hello Daniel Becker, Gabor Kaszab, Zoltan Borok-Nagy, Noemi Pap-Takacs, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/21423 to look at the new patch set (#10). Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables MERGE statement is a DML command that allows users to perform conditional insert, update, or delete operations on a target table based on the results of a join with a source table. This change adds MERGE statement parsing and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino by supporting arbitrary number of WHEN clauses, with conditions or without and accepting inline views as source. Example: 'MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED AND t.id < 100 THEN UPDATE SET column1 = s.column1 WHEN MATCHED AND t.id > 100 THEN DELETE WHEN MATCHED THEN UPDATE SET column1 = "value" WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.column1);' The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning phase, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - Authorization tests - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/MultiDataSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A fe/src/test/java/org/apache/impala/analysis/MergeInsertTes
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Hello Daniel Becker, Gabor Kaszab, Zoltan Borok-Nagy, Noemi Pap-Takacs, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/21423 to look at the new patch set (#9). Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables MERGE statement is a DML command that allows users to perform conditional insert, update, or delete operations on a target table based on the results of a join with a source table. This change adds MERGE statement parsing and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino by supporting arbitrary number of WHEN clauses, with conditions or without and accepting inline views as source. Example: 'MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED AND t.id < 100 THEN UPDATE SET column1 = s.column1 WHEN MATCHED AND t.id > 100 THEN DELETE WHEN MATCHED THEN UPDATE SET column1 = "value" WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.column1);' The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning phase, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - Authorization tests - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/MultiDataSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A fe/src/test/java/org/apache/impala/analysis/MergeInsertTest
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Peter Rozsa has uploaded a new patch set (#8). ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables MERGE statement is a DML command that allows users to perform conditional insert, update, or delete operations on a target table based on the results of a join with a source table. This change adds MERGE statement parsing and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino by supporting arbitrary number of WHEN clauses, with conditions or without and accepting inline views as source. Example: 'MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED AND t.id < 100 THEN UPDATE SET column1 = s.column1 WHEN MATCHED AND t.id > 100 THEN DELETE WHEN MATCHED THEN UPDATE SET column1 = "value" WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.column1);' The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning phase, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - Authorization tests - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/MultiDataSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A fe/src/test/java/org/apache/impala/analysis/MergeInsertTest.java M fe/src/test/java/org/apache/impala/analysis/ParserTest.java M fe/src/test/java/org/apache/impala/planner/PlannerTest.java M fe/
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Gabor Kaszab has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 7: (18 comments) I managed to finish the first iteration on the code. Thanks for all this effort, Peter! http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.h File be/src/exec/iceberg-merge-node.h: http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.h@61 PS7, Line 61: ScalarExpr* row_present_ = nullptr; nit: this section below would be more readable with linebreaks included. http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.h@103 PS7, Line 103: Status EvaluateCases(RowBatch* output_batch); nit: without linebreaks this part of the class lacks any structure and it's not that easy to read. http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.h@148 PS7, Line 148: std::vector filter_conjuncts_; nit: I believe that the order of the members is constants first, then functions then variables http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.h@177 PS7, Line 177: return TIcebergMergeSinkAction::BOTH; DCHECK for type_ == BOTH? http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.cc File be/src/exec/iceberg-merge-node.cc: http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.cc@43 PS7, Line 43: auto& tmerge_cases = tnode.merge_node.cases; No need to create a variable for this, only used once. We can use the tnode.merge_node.cases directly in the for loop. http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.cc@48 PS7, Line 48: RETURN_IF_ERROR(merge_case_plan->Init(tmerge_case, state, row_descriptor_)); Since IcebergMergeCasePlane is not an inherited class, I think we can get rid of the Init() fn and use the constructor for initialization. Additionally, we might be able to make the member variables const if populated in the constructor. http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.cc@67 PS7, Line 67: RowDescriptor* row_desc row_desc seems a simple 'in' parameter. should be const ref http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.cc@92 PS7, Line 92: DCHECK(pnode.merge_action_tuple_id_ != -1); nit: some linebreaks would be nice to increase readability with the structure. http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.cc@135 PS7, Line 135: RETURN_IF_ERROR(child(0)->Open(state)); Apparnetly, in Preapre() we didn't have to call Prepare for the child, bit in Open we do have to delegate it to the child. Could you help me understand, why? http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.cc@206 PS7, Line 206: { continue; } nit: no need for the braces http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.cc@217 PS7, Line 217: TupleRow* dst_row = output_batch->GetRow(dst_row_idx); You can merge this line with the one above http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-node.cc@226 PS7, Line 226: TIcebergMergeSinkAction::type action = merge_case->SinkAction(); You declare this too early. Even no need for a variable since the expression is short enough, you can use 'merge_case->SinkAction()' at L242. http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-sink.cc File be/src/exec/iceberg-merge-sink.cc: http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-sink.cc@43 PS7, Line 43: DCHECK(tsink.child_data_sinks[1].table_sink.action == TSinkAction::DELETE); nit: linebreak after the last DCHECK pls http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-sink.cc@48 PS7, Line 48: const TDataSink& delete_sink = tsink.child_data_sinks[1]; nit: linebreak before this line to separate the code between insert_sink and delete_sink http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-sink.cc@55 PS7, Line 55: auto merge_action = tsink.output_exprs[0]; this could be merged into the line below. it'd still fit into one line http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-sink.cc@81 PS7, Line 81: profile()->AddChild(insert_sink_->profile()); I recall that in some use-cases it's not neccessary to create both sinks. Should we check for nullness here for the sinks? I see other places too where we call functions on the sink pointers without checking for nullness. http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-sink.cc@87 PS7, Line 87: merge_action_evaluator_ = output_expr_evals_[0]; we could do this in the constructor http://gerrit.cloudera.org:8080/#/c/21423/7/be/src/exec/iceberg-merge-sink.cc@108 PS7, Line 108: auto index = delete_rows.AddRow(); : auto outpu
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Gabor Kaszab has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 7: (25 comments) I managed to finish the first iteration on the FE changes plus the tests. Thanks for this huge review! I'll continue with the BE changes. http://gerrit.cloudera.org:8080/#/c/21423/7/common/thrift/PlanNodes.thrift File common/thrift/PlanNodes.thrift: http://gerrit.cloudera.org:8080/#/c/21423/7/common/thrift/PlanNodes.thrift@738 PS7, Line 738: 2: optional list filter_conjuncts Just thinking out load here: Can't we represent the merge-case as a filter conjunct here? e.g. WHEN MATCHED could be 'row_present == 0'. This could be constructed in analysis time if I don't miss anything, and then the 'filter_conjuncts' would be required here. And then we might be able to remove the TMergeCaseType too from this struct. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@192 PS7, Line 192: List positionMetaExprs = Expr.cloneList(getPositionMetaExprs()); I'm not sure I get why we have to clone the cases and expressions here. Can't we simply pass them as they are? I probably miss something, could you explain? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@195 PS7, Line 195: parent, I find it confusing that a variable names 'parent' is passed to this constructor as a parameter named 'child' on the other side. I might miss something here, though. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@204 PS7, Line 204: createDataSink similarly to getPlanNode, I don't think creating stuff related to the planner, like data sinks, should be placed into analyzer classes. Update, I see this is needed because MergeStmt has to implement it as an override from DmlStatementBase. Well, I don't like it but then let's keep it :) http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@215 PS7, Line 215: TableSink deleteSink = Not all use cases require an insert/delete sink. Is it necessary to create both of them undoncitionally? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java File fe/src/main/java/org/apache/impala/analysis/MergeInsert.java: http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@117 PS7, Line 117: analyzedColumnPermutation I think the name suggests the opposite than what this is for. 'columnsToBeAnalyzed' or something similar would explain the purpose better. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@119 PS7, Line 119: Collectors.toMap(s -> s, o -> columnPermutation_.indexOf(o))); For me it seems an overkill to pre-collect the indexes for the column names. Doesn't seem to increase performance but adds extra code complexity. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@122 PS7, Line 122: boolean emptyColumnPermutation = Not sure about this so just asking: For the case of no column permutations provided wouldn't the code be more clear if we did this: - do a check if column permutations are provided - uncoditionally go through column of the table and populate 'matches' - return and then do the rest without the IMPLICIT case below. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@138 PS7, Line 138: null Not sure why you don't use the name of the col for the IMPLICIT case. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@143 PS7, Line 143: null)); nit: this could fit into line above http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@183 PS7, Line 183: if (columnPermutationSize > selectListSize) { For me logically the length check could more naturally fit right after the duplicate check. Here you use the size of 'columnMatches' for the IMPLICIT case, however you could use the number of cols instead if you moved this code above the population of 'columnMatches'. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@186 PS7, Line 186: target, below you put these 3 params into a single line, but here you break them into separate lines. Pls keep it consistent. I'd prefer the more compact version. ht
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Gabor Kaszab has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 7: (16 comments) Another batch. I'm almost done with the Analyzer part. Will take another look at MergeInsert and then will move to the Planner. http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@24 PS6, Line 24: > Done This doesn't seem done http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeCase.java File fe/src/main/java/org/apache/impala/analysis/MergeCase.java: http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@28 PS7, Line 28: parent_ > nit: I think naming this mergeStmt_ would be more descriptive than 'parent_ Anyway, I checked the usages for parent_ and apparently we query table names and columns from MergeStmt. This for me violates encapsulation for MergeCase classes and I'd think it would be a better approach to not pass MergeStmt into MergeCase and pass the necessary attributes only. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@33 PS7, Line 33: protected MergeCase() { filterExprs_ = Collections.emptyList(); } With this implementation we rely on the inherited classes to initialize resultExprs_. This could be unsafe if someone in the future adds code to this base class and assume that similarly to filterExprs_, resultExprs_ is also non-null. I'd rather initialize as an empty list here. Maybe wa can do precondition check before the usages to verify this is not null. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java File fe/src/main/java/org/apache/impala/analysis/MergeInsert.java: http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@33 PS7, Line 33: public class MergeInsert extends MergeCase { comments pls, with examples for better understanding http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@35 PS7, Line 35: /** nit: I think regular, single line comment format is fine for member variables. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@49 PS7, Line 49: nit: extra line http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@65 PS7, Line 65: for (Expr expr : resultExprs_) { nit: fits into single line http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@82 PS7, Line 82: return new MergeInsert(parent_, resultExprs_, getFilterExprs()); This clone doesn't actually clone the internal state of this class just for the superclass. I find this misleading because even though we do a clone we actually lose internal information. At this point of time you know where the clone() fn is actually invoked and you know that some of the members won't be needed from that point on, but it is not guaranteed that in the future cloning won't be called elsewhere. I think this isn't a futureproof approach. I'd prefer actual clone where we populate everything. Also, see other clone() implementations in MergeCase subclasses. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@102 PS7, Line 102: Set duplicateFilteredColumnPermutation = new LinkedHashSet<>( I think duplicate check could be achieved easier than this. You just create an empty HashSet, start iterating through columnPermutation_, keep adding the col names to the HashSet and once you find a duplicate you throw an exception. I think it's enough to return an error with the first duplicate found, no need to gather all of them. Not to mention that here you do many iterations on these columns by pre-deduplicating, then copying, then removing. This could be achieved in one iteration even if you want to collect all the dups. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@127 PS7, Line 127: for (int i = 0; i < columns.size(); ++i) { Is the index 'i' used anywhere? Can this be a foreach on 'columns'? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@225 PS7, Line 225: public static class ColumnMatch { This inner class is for internal use, right? Can be private/protected then. Same for ColumnMatches. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@227 PS7, Line 227: private final Column targetTableColumn; nit: '_' suffix http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Gabor Kaszab has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 7: (22 comments) doing another dump. Still not finished with the analyzer part, but getting there soon. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java File fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java: http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@92 PS7, Line 92: protected void checkSubQuery(SlotRef lhsSlotRef, Expr rhsExpr) These functions, if I don't miss anything, can work without an actual object of this class as they receive all information via params and don't need a state. Can they be static to the class? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@189 PS7, Line 189: (MergeCase) Is it necessary to convert the items to MergeCase. I believe they are of MergeCase type already. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@225 PS7, Line 225: public Expr getRowPresentExpr() { Haven't checked all of these but at least some of them are only called within this class. They could be private then. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@246 PS7, Line 246:* SELECT /* +straight_join */ nit: could you indent this example SQL for better readability? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@247 PS7, Line 247:* CAST(TupleIsNull(0) + TupleIsNull(1) * 2 AS TINYINT) row_present, this is rather: CAST(TupleIsNull(0) + CAST(TupleIsNull(1) * 2 AS TINYINT) 1: pls update the comment 2: do we need the inner cast to TINYINT after we multiple a bool with a tinyint? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@262 PS7, Line 262: targetColumns this is rather 'targetSlotRefs' http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@263 PS7, Line 263: targetTableRef_.getTable() : .getColumns() : .stream() : .map(column : -> new SlotRef( : ImmutableList.of(targetTableRef_.getUniqueAlias(), : column.getName( : .collect(Collectors.toList()); not sure if this is auto formatted, but I find it more compact and readable if we indent like this: tblRef.getTable.getColumns().stream() .map() .collect() http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@362 PS7, Line 362: sortColumns this is rather sortColumnPositions or sortColumnIndexes, right? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@365 PS7, Line 365: resultExpressions::get could you write expressions_.resultExpressions()::get (or similar) here and get rid of L363? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@372 PS7, Line 372: in the same format nit: what do you mean by 'in the same format'? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@374 PS7, Line 374: protected static class MergeExpressions { If I'm not mistaken, this class is to store intermediate states of this class, but this doesn't really add any extra logic on top. I feel that anything that is within this class could be simply members of the outer class. And then we would save all these getters/setters. No strong feelings, though. If we decide to keep this, then I think the members should be public to get rid of these getters/setters. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@378 PS7, Line 378: private final List sortingOrder_; this name is confusing with the one 2 lines below. What if we called this sortingColumnPositions or such? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@391 PS7, Line 391: private List targetExpressions; nit: member names should have a '_' suffix http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@399 PS7, Line 399: public MergeExpressi
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Gabor Kaszab has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 7: (25 comments) Let me flush another batch of comments now. I'm somewhere halfway through the analyzer changes. http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/cup/sql-parser.cup File fe/src/main/cup/sql-parser.cup: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/cup/sql-parser.cup@1027 PS6, Line 1027: List cases = Lists.newArrayList(merge_case); nit: I think here we can merge these 2 rows http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@60 PS7, Line 60: public class IcebergMergeImpl extends MergeImpl { As I wrote in MergeStmt, I'm not sure I'm convinced that we need a separate abstraction for the implementation. One reason is that we don't plan to add any other implementations and even if we did, the coding should hppen the other way around: when we add the other Merge impl then as a separate commit we could do the extra level of abstracion then as a refactor. The other thing I find myself uncomfortable with is that this IcebergMergeImpl now uses all the inner members of MergeStmt, even the MergeStmt itself is passed to the constructor. For me this asks for Inheritance rather than an impl member. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@93 PS7, Line 93: if (mergeStmt_.isOnlyMatchedCases()) { : sourceTableRef_.setJoinOp(JoinOperator.INNER_JOIN); : } else { : sourceTableRef_.setJoinOp(JoinOperator.FULL_OUTER_JOIN); : } : sourceTableRef_.setOnClause(on_); : sourceTableRef_.setLeftTblRef(targetTableRef_); This could go into a setJoinParams() function http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@120 PS7, Line 120: icebergPositionalDeleteTable_ = new IcebergPositionDeleteTable( If you have only a WHEN NOT NATCHED THEN INSERT case do you still have to create a pos del table? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@127 PS7, Line 127: for (MergeCase mergeCase : mergeStmt_.getCases()) { Since all these merge cases live inside MergeStmt, shouldn't this code live in MergeStmt.analyze()? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@131 PS7, Line 131: analyzer.registerPrivReq( Here we require ALL privs on the target table. Shouldn't we also expect SELECT privs on the source table? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@138 PS7, Line 138: private void addMergeActionTuple(Analyzer analyzer) { Function comment pls http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@140 PS7, Line 140: analyzer.getDescTbl().createTupleDescriptor(MERGE_ACTION_TUPLE_NAME); What is a merge action and why is a tuple needed for it? Could you comment it on the class? http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@153 PS7, Line 153: public void substituteResultExprs(ExprSubstitutionMap smap, It stinks for me that MergeStmt also has a fn with similar name but here we inherit this not from there but from another interface, MergeImpl. For me this also indicates that this class should be a child of MergeStmt http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@186 PS7, Line 186: public PlanNode getPlanNode(PlannerContext ctx, PlanNode parent, Creating new PlanNodes should be the responsibility of the Planner, in my opinion. An Analysis class in the Analysis package shouldn't know how a corresponding PlanNode is created. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@204 PS7, Line 204: public DataSink createDataSink() { I think this should be an override. Also true to some other functions in this class. http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeImpl.java File fe/src/main/java/org/apache/impala/analysis/MergeImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/7/fe/src/main/java/org/apache/impala/analysis/MergeImpl.java@20 PS7, Line 20: public abstract QueryStmt prepareQuery(); Since this serves as an interface for the Merge implementations (a
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 7: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/16329/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 7 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Thu, 13 Jun 2024 12:40:06 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Peter Rozsa has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 7: (63 comments) Thank you Zoltan and Gabor! I covered most of the comments, some of them are still open. If we could resolve those, I'll split up this change into multiple pieces to make reviewing easier. http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@7 PS6, Line 7: IMPALA-12732: Add support for MERGE statements for Iceberg tables > I know MERGE is a known SQL functionality but I'd find it useful to add a f Done http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@11 PS6, Line 11: ource table. This change adds MERGE > Would you mind adding some examples? Done http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@24 PS6, Line 24: > nit: maybe 'phase' is a better word here Done http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@47 PS6, Line 47: > We should also have authorization tests. Also for fine-grained authz, e.g. Done http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h File be/src/exec/iceberg-merge-node.h: http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@18 PS6, Line 18: #pragma once : > nit: we prefer #pragma once Done http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@43 PS6, Line 43: /// for merge action and for the target table. > Please add class comment Done http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@101 PS6, Line 101: /// added to the > We prefer std::unique_ptr Done http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@158 PS6, Line 158: se(con > no need for 'inline', member-functions defined inside class definitions are Done http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc File be/src/exec/iceberg-merge-node.cc: http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@165 PS6, Line 165: os > nit: too much indent Done http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@178 PS6, Line 178: auto row = iter.Ge > I'm not sure if we want to reset last_row_ each time EvaluateCases() is inv Removed http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@186 PS6, Line 186: last_row_ = row; : > Can we also output the duplicated row with PrintTuple()? It's a tricky situation; it's easy to print the target table's tuple, but it also contains the virtual columns, so for small tables, the path will be the dominant part of the error message; I think it's acceptable. To print the source tuples, we have to consider cases where the source row consists of multiple tuples, so first we have to distinguish the source tuples or the tuple that is used for the join, and then print them. There's another quite simple option; using PrintRow. What do you think? Examples: 1. ERROR: Duplicate row found: one target table row matched more than one source row. Target row: (3 3 3 hdfs://localhost:20500/test-warehouse/target_part/data/514806418e9554cc-654b33170005_831375517_data.0.parq 3) 2. ERROR: Duplicate row found: one target table row matched more than one source row. Target row: (4 4 4 hdfs://localhost:20500/test-warehouse/target_part/data/514806418e9554cc-654b33170005_831375517_data.0.parq 4), Source row: (4 4 4) (4 4 4) (4 4 4) (50) 3. ERROR: Duplicate row found: one target table row matched more than one source row. Affected row: [(4 4 4 hdfs://localhost:20500/test-warehouse/target_part/data/514806418e9554cc-654b33170005_831375517_data.0.parq 0) (4 4 4) (4 4 4) (4 4 4) (50)] // The source table ref here are 4 joined tables http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@209 PS6, Line 209: if (ReachedLimit() | > nit: to reduce nesting, we could have Done http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@210 PS6, Line 210: } : return Status::OK(); : } : : void IcebergMergeNode::AddRow( : RowBatch* output_batch, IcebergMergeCase* merge_case, TupleRow* row) { : int dst_row_idx = output_batch->AddRow(); : TupleRow* dst_row = output_batch->GetRow(dst_row_idx); : : auto* target_tuple = : Tuple::Create(row_descriptor_.tuple_descriptors()[target_tuple_idx_]->byte_size(), : output_batch->tuple_data_pool()); : auto* merge_action_tuple = Tuple::Create( : row_descriptor_.tuple_descriptors()[merge_action_tuple_idx_]->byte_size(), : output_batch->tuple_data_pool()); :
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Peter Rozsa has uploaded a new patch set (#7). ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables MERGE statement is a DML command that allows users to perform conditional insert, update, or delete operations on a target table based on the results of a join with a source table. This change adds MERGE statement parsing and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino by supporting arbitrary number of WHEN clauses, with conditions or without and accepting inline views as source. Example: 'MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED AND t.id < 100 THEN UPDATE SET column1 = s.column1 WHEN MATCHED AND t.id > 100 THEN DELETE WHEN MATCHED THEN UPDATE SET column1 = "value" WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.column1);' The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning level, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - Authorization tests - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/java/org/apache/impala/util/IcebergUtil.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A fe/src/test/java/org/apache/impala/analysis/MergeInsertTest.java M fe/src/test/java/org/apache/impala/analysis/ParserTest.java M fe/src
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Gabor Kaszab has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 6: (1 comment) http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test File testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test: http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@324 PS6, Line 324: when not matched then insert(id, user) values(source.id, source.user) I admit I haven't checked the code, but is there a precedence between these cases set programatically or is it up to the order the cases were provided. What I mean is for instance we have a 'when matched and ' and also a more general 'when matched' case too. In this order the first covers the narrower case and then comes the more general case to cover the rows not covered by the first case. If we reverse the order of these would we get the same results? -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 6 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Fri, 31 May 2024 14:26:32 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Gabor Kaszab has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 6: (7 comments) Thanks for this huge work, Peti! I'm only at the beginning of reviewing, only checked the commit msg and the tests, just dumping what I have now. http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@7 PS6, Line 7: IMPALA-12732: Add support for MERGE statements for Iceberg tables I know MERGE is a known SQL functionality but I'd find it useful to add a few sentences about the functionality itself. http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@11 PS6, Line 11: same syntax as Hive, Spark, and Trino Would you mind adding some examples? http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java File fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java@264 PS6, Line 264: AnalysisError("merge into " for me this test seems identical to the one right above. Or do I miss something? http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java@384 PS6, Line 384: Column permutation Is this error msg introduced by this patch or did it exist before? I found the word permutation weird here. 'Column list' could be more nature IMO http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java@386 PS6, Line 386: // Fewer columns in VALUES Actually, this is also a 'More columns in VALUES' case similarly to above. http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java@410 PS6, Line 410: AnalyzesOk("merge into functional_parquet.iceberg_partitioned target " nit: This test could go above the error cases. Also some comment about the intention could be great like 'multiple MATCHED and NOT MATCHED cases' http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test File testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test: http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test@115 PS6, Line 115: TYPES Do you intentionally not asserting on some profile metrics from this test on? -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 6 Gerrit-Owner: Peter Rozsa Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Gabor Kaszab Gerrit-Reviewer: Noemi Pap-Takacs Gerrit-Reviewer: Peter Rozsa Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Fri, 31 May 2024 14:23:17 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. Patch Set 6: (55 comments) Awesome work, this will be a huge milestone for the Impala project! Did a first round, but planning to do another rounds as this change is massive :) http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@24 PS6, Line 24: level nit: maybe 'phase' is a better word here http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@47 PS6, Line 47: We should also have authorization tests. Also for fine-grained authz, e.g. test that if the source table is masked/filtered, then the MERGE statement cannot be used to expose the sensitive data. http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h File be/src/exec/iceberg-merge-node.h: http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@18 PS6, Line 18: #ifndef IMPALA_EXEC_ICEBERG_MERGE_NODE_H : #define IMPALA_EXEC_ICEBERG_MERGE_NODE_H nit: we prefer #pragma once http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@43 PS6, Line 43: class IcebergMergePlanNode : public PlanNode { Please add class comment http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@101 PS6, Line 101: boost::scoped_ptr We prefer std::unique_ptr http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@158 PS6, Line 158: inline no need for 'inline', member-functions defined inside class definitions are implicitly inline: https://en.cppreference.com/w/cpp/language/inline http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc File be/src/exec/iceberg-merge-node.cc: http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@165 PS6, Line 165: nit: too much indent http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@178 PS6, Line 178: last_row_ = nullptr; I'm not sure if we want to reset last_row_ each time EvaluateCases() is invoked. E.g. what happens if output_batch becomes full when then there are duplicates in child_row_batch_? http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@186 PS6, Line 186: return Status( : "Duplicate row found: one target table row matched more than one source row"); Can we also output the duplicated row with PrintTuple()? It would be nice to print both the target tuple and the matching source tuples. http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@209 PS6, Line 209: if (selected_case) { nit: to reduce nesting, we could have if (!selected_case) continue; http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@210 PS6, Line 210: // Add a new row to output_batch : int dst_row_idx = output_batch->AddRow(); : TupleRow* dst_row = output_batch->GetRow(dst_row_idx); : : auto* target_tuple = Tuple::Create( : row_descriptor_.tuple_descriptors()[target_tuple_idx_]->byte_size(), : output_batch->tuple_data_pool()); : auto* merge_action_tuple = Tuple::Create( : row_descriptor_.tuple_descriptors()[merge_action_tuple_idx_]->byte_size(), : output_batch->tuple_data_pool()); : : TIcebergMergeSinkAction::type action = selected_case->SinkAction(); : : dst_row->SetTuple(target_tuple_idx_, target_tuple); : dst_row->SetTuple(merge_action_tuple_idx_, merge_action_tuple); : : for (int i = 0; i < row_descriptor_.tuple_descriptors().size(); i++) { : if (i != target_tuple_idx_ && i != merge_action_tuple_idx_) { : dst_row->SetTuple(i, nullptr); : } : } : : target_tuple->MaterializeExprs(source_row, : *row_descriptor_.tuple_descriptors()[target_tuple_idx_], : selected_case->combined_evaluators_, output_batch->tuple_data_pool()); : : RawValue::WriteNonNullPrimitive(&action, merge_action_tuple, type, nullptr); : : output_batch->CommitLastRow(); : IncrementNumRowsReturned(1); nit: for readability, this could go to its own member function http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@248 PS6, Line 248: if (previous_row == nullptr) { : return false; : } nit: fits single line http://gerrit.cloudera.org:8080/#/c/2142
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Peter Rozsa has uploaded a new patch set (#6). ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables This change adds MERGE statement parsing, and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino. The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning level, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/java/org/apache/impala/util/IcebergUtil.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A fe/src/test/java/org/apache/impala/analysis/MergeInsertTest.java M fe/src/test/java/org/apache/impala/analysis/ParserTest.java M fe/src/test/java/org/apache/impala/planner/PlannerTest.java M shell/impala_shell.py A testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test M tests/query_test/test_iceberg.py 43 files changed, 3,696 insertions(+), 104 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/21423/6 -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Mess
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Peter Rozsa has uploaded a new patch set (#5). ( http://gerrit.cloudera.org:8080/21423 ) Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables This change adds MERGE statement parsing, and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino. The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning level, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/java/org/apache/impala/util/IcebergUtil.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A fe/src/test/java/org/apache/impala/analysis/MergeInsertTest.java M fe/src/test/java/org/apache/impala/analysis/ParserTest.java M fe/src/test/java/org/apache/impala/planner/PlannerTest.java M shell/impala_shell.py A testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test M tests/query_test/test_iceberg.py 43 files changed, 3,697 insertions(+), 104 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/21423/5 -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Mess
[Impala-ASF-CR] IMPALA-12732: Add support for MERGE statements for Iceberg tables
Peter Rozsa has uploaded this change for review. ( http://gerrit.cloudera.org:8080/21423 Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables .. IMPALA-12732: Add support for MERGE statements for Iceberg tables This change adds MERGE statement parsing, and an Iceberg-specific semantic analysis, planning, and execution. The parsing grammar follows the SQL standard, it accepts the same syntax as Hive, Spark, and Trino. The Iceberg-specific analysis, planning, and execution are based on a concept that was previously used for UPDATE: The analyzer creates a SELECT statement with all target and source columns (including Iceberg's virtual columns) and a 'row_present' column that defines whether the source, the target, or both rows are present in the result set after joining the two table references by the ON clause. The join condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala currently supports only equi-joins in this case. The joining order is forced by a query hint, this guarantees that the target table is always on the left side. A new, IcebergMergeNode is added at planning level, this node does the row-level filtering for each MATCHED/ NOT MATCHED cases. The 'row_present' column decides which case group will be evaluated; if both sides are available, the matched cases, if only the source side matches then the not matched cases and their filter expressions will be evaluated over the row. If one of the cases match, then the execution evaluates the result expressions into the output row batch, and an auxiliary tuple will store the merge action. The merge action is a flag for the newly added IcebergMergeSink; this sink will route each incoming row from IcebergMergeNode to their respective destination. Each row could go to the delete sink, insert sink, or to both sinks. Target-side duplicate records are filtered during IcebergMergeNode's execution, if one target table-side duplicate is detected, the whole statement's execution is stopped and the error is reported back to the user. Added tests: - Parser tests - Analyzer tests - Unit test for WHEN NOT MATCHED INSERT column collation - Planner tests for partitioned/sorted cases - E2E tests Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/exec-node.cc A be/src/exec/iceberg-merge-node.cc A be/src/exec/iceberg-merge-node.h A be/src/exec/iceberg-merge-sink.cc A be/src/exec/iceberg-merge-sink.h M be/src/service/client-request-state.cc M common/thrift/DataSinks.thrift M common/thrift/PlanNodes.thrift M common/thrift/Types.thrift M fe/src/main/cup/sql-parser.cup M fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java A fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeCase.java A fe/src/main/java/org/apache/impala/analysis/MergeDelete.java A fe/src/main/java/org/apache/impala/analysis/MergeImpl.java A fe/src/main/java/org/apache/impala/analysis/MergeInsert.java A fe/src/main/java/org/apache/impala/analysis/MergeStmt.java A fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java A fe/src/main/java/org/apache/impala/planner/IcebergMergeSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/PlannerContext.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/java/org/apache/impala/util/IcebergUtil.java M fe/src/main/jflex/sql-scanner.flex M fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java A fe/src/test/java/org/apache/impala/analysis/MergeInsertTest.java M fe/src/test/java/org/apache/impala/analysis/ParserTest.java M fe/src/test/java/org/apache/impala/planner/PlannerTest.java M shell/impala_shell.py A testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test M tests/query_test/test_iceberg.py 43 files changed, 3,697 insertions(+), 104 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/21423/3 -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-M