[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 13: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 13 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Mon, 18 Dec 2023 19:14:20 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has submitted this change and it was merged. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, most importantly it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 IcebergBufferedDeleteSink cannot spill to disk, so it can only run if there's enough memory to store the delete records. Paths are stored only once, and the int64_t positions are stored in a vector, so updating 100 Million records per node should require around 800MBs + (100K) filepaths ~= 820 MBs of memory per node. Spilling could be added later, but currently the need for it is not too realistic. Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Reviewed-on: http://gerrit.cloudera.org:8080/20760 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 35 files changed, 1,951 insertions(+), 349 deletions(-) Approvals: Impala Public Jenkins: Looks good to me, approved; Verified -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/setting
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 13: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10063/ DRY_RUN=false -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 13 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Mon, 18 Dec 2023 14:36:03 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 13: Code-Review+2 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 13 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Mon, 18 Dec 2023 14:36:02 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 12: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14742/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 12 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Fri, 15 Dec 2023 16:18:28 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 12: (8 comments) Thanks for the comments! http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h@28 PS11, Line 28: > How come StringVal has to be redefined here? At some point I had to declare it, but you're right, now it is redundant. http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h@138 PS11, Line 138: > nit: empty space Done http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-delete-sink-base.h File be/src/exec/iceberg-delete-sink-base.h: http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-delete-sink-base.h@49 PS11, Line 49: Status ConstructPartitionInfo(const TupleRow* row, OutputPartition* output_partition); : Status ConstructPartitionInfo(int32_t spec_id, const std::string& partitions, : OutputPartition* output_partition); > nit: this should fit into one line Done http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@20 PS11, Line 20: import java.util.ArrayList; > unused import Done http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@30 PS11, Line 30: import org.apache.impala.common.AnalysisExc > unused import Done http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@37 PS11, Line 37: import org.apache.impala.thrift.TSortingOrder; > unused import Done http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@44 PS11, Line 44: // Id of the delete table in the descriptor t > unused import Done http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java: http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34 PS11, Line 34: IcebergBufferedDeleteSink > For me it looks like that ~40% of this class is identical with IcebergDelet Yeah, actually I want to remove IcebergDeleteSink in the future: IMPALA-12640 Keeping them separate will make the removal easier. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 12 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Fri, 15 Dec 2023 15:51:30 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#12). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, most importantly it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 IcebergBufferedDeleteSink cannot spill to disk, so it can only run if there's enough memory to store the delete records. Paths are stored only once, and the int64_t positions are stored in a vector, so updating 100 Million records per node should require around 800MBs + (100K) filepaths ~= 820 MBs of memory per node. Spilling could be added later, but currently the need for it is not too realistic. Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 35 files changed, 1,951 insertions(+), 349 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/12 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Br
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Tamas Mate has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 11: (8 comments) Looks great Zoltan, just a few minor comments. http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h@28 PS11, Line 28: struct StringVal; How come StringVal has to be redefined here? http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h@138 PS11, Line 138: nit: empty space http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-delete-sink-base.h File be/src/exec/iceberg-delete-sink-base.h: http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-delete-sink-base.h@49 PS11, Line 49: Status ConstructPartitionInfo( : const TupleRow* row, : OutputPartition* output_partition); nit: this should fit into one line http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@20 PS11, Line 20: import static java.lang.String.format; unused import http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@30 PS11, Line 30: import org.apache.impala.catalog.FeFsTable; unused import http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@37 PS11, Line 37: import org.apache.impala.planner.IcebergDeleteSink; unused import http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@44 PS11, Line 44: import com.google.common.collect.ImmutableList; unused import http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java: http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34 PS11, Line 34: IcebergBufferedDeleteSink For me it looks like that ~40% of this class is identical with IcebergDeleteSink. Do you think it would worth inheriting from IcebergDeleteSink class instead of TableSink? -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 11 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Fri, 15 Dec 2023 14:27:50 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 6: (1 comment) Thanks for the review, Daniel! http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34 PS6, Line 34: TableSink > Could you open a Jira for this? Opened IMPALA-12640. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 6 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Fri, 15 Dec 2023 12:56:17 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Daniel Becker has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 11: Code-Review+1 (1 comment) http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34 PS6, Line 34: TableSink > Ok, if IcebergDeleteSink will probably be deleted we can leave it as it is Could you open a Jira for this? -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 11 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Fri, 15 Dec 2023 12:46:37 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 11: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 11 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 13 Dec 2023 20:22:46 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 11: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14706/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 11 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 13 Dec 2023 15:36:09 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 11: (5 comments) Thanks for the comments! http://gerrit.cloudera.org:8080/#/c/20760/10/be/src/exec/table-sink-base.h File be/src/exec/table-sink-base.h: http://gerrit.cloudera.org:8080/#/c/20760/10/be/src/exec/table-sink-base.h@90 PS10, Line 90: must already have > Nit: "must already have filled". Done http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@115 PS6, Line 115: In case of a JOIN, and if duplicated rows ar > It is a bit nit-picky, I meant that in the sentence "If there are duplicate Updated the comment. http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@126 PS6, Line 126: se_.size() > 1) > I wanted to ask if it is possible that modifyStmt_.fromClause_.size() == 1. Even 'UPDATE tbl SET val = 3;' has a fromClause_ (maybe the null checking is redundant, but I think it should be fine), and have a single tableRef which is for the target table 'tbl'. Updated the error message. http://gerrit.cloudera.org:8080/#/c/20760/10/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test File testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test: http://gerrit.cloudera.org:8080/#/c/20760/10/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test@400 PS10, Line 400: 1 > Are these changes compared to PS7 because of a rebase? No, this is because of the new INSERT INTO in functional_schema_template.sql. http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test File testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test: http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test@252 PS7, Line 252: FROM clause > I asked because I'm unsure whether we should add "multiple tables" to the c Done -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 11 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 13 Dec 2023 15:14:22 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 11: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10025/ DRY_RUN=true -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 11 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 13 Dec 2023 15:14:45 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#11). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, most importantly it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 IcebergBufferedDeleteSink cannot spill to disk, so it can only run if there's enough memory to store the delete records. Paths are stored only once, and the int64_t positions are stored in a vector, so updating 100 Million records per node should require around 800MBs + (100K) filepaths ~= 820 MBs of memory per node. Spilling could be added later, but currently the need for it is not too realistic. Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 35 files changed, 1,960 insertions(+), 345 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/11 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Br
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Daniel Becker has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 10: (9 comments) http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc File be/src/exec/iceberg-delete-sink.cc: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@79 PS5, Line 79: VerifyRowsNotDuplicated > file paths and positions are not sorted across partitions. So we would need Ok, it can stay as it is. http://gerrit.cloudera.org:8080/#/c/20760/10/be/src/exec/table-sink-base.h File be/src/exec/table-sink-base.h: http://gerrit.cloudera.org:8080/#/c/20760/10/be/src/exec/table-sink-base.h@90 PS10, Line 90: must already fill Nit: "must already have filled". http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@115 PS6, Line 115: If there are duplicates in the JOIN operator > I'm not sure what is the point here. Duplicates are only possible in the co It is a bit nit-picky, I meant that in the sentence "If there are duplicates [...] then we cannot do duplicate checking in the SINK if ..." the condition at the beginning is not necessary - if it happens that there are actually no duplicates we still can't check for them if the rows are shuffled independently. I'd suggest something like this: """ In case of a JOIN, if duplicated rows can be shuffled independently, we cannot do duplicate checking in the SINK. This is the case when ... """ http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@126 PS6, Line 126: via UPDATE FROM > There will be always at least one tableRef because of the target table. I wanted to ask if it is possible that modifyStmt_.fromClause_.size() == 1. 1. If it is possible, then in that case the exception (currently) won't be thrown. 1a) If it should be thrown we should remove that condition. 1b) Otherwise, the error message lists the conditions that were needed to trigger the error: - partition column, - non-constant RHS -> in this case we should include "more than one table ref in the FROM clause" as well 2. If modifyStmt_.fromClause_.size() == 1 is not possible, we should remove the relevant part of the condition on L123 and add a precondition check instead. http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java File fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java@101 PS6, Line 101: public TSortingOrder getSortingOrder() { > There's good chance we will need it later, e.g. optimizing a table that has Ok, it should stay then. http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34 PS6, Line 34: TableSink > It may have some value now, as there are some common fields/methods, but I' Ok, if IcebergDeleteSink will probably be deleted we can leave it as it is now. But we should open a Jira about it then. http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql File testdata/datasets/functional/functional_schema_template.sql: http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql@3407 PS7, Line 3407: E TA > Makes sense, I never really thought about this as I usually re-load my tabl I agree, let's not make this patch even bigger. http://gerrit.cloudera.org:8080/#/c/20760/10/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test File testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test: http://gerrit.cloudera.org:8080/#/c/20760/10/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test@400 PS10, Line 400: 1 Are these changes compared to PS7 because of a rebase? http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test File testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test: http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test@252 PS7, Line 252: FROM clause > I think yes, otherwise you cannot have a join that produces duplicates. I asked because I'm unsure whether we should add "mult
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 10: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 10 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Wed, 13 Dec 2023 01:08:43 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 10: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14679/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 10 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 20:37:55 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#10). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, most importantly it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 IcebergBufferedDeleteSink cannot spill to disk, so it can only run if there's enough memory to store the delete records. Paths are stored only once, and the int64_t positions are stored in a vector, so updating 100 Million records per node should require around 800MBs + (100K) filepaths ~= 820 MBs of memory per node. Spilling could be added later, but currently the need for it is not too realistic. Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 35 files changed, 1,956 insertions(+), 344 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/10 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Br
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 10: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10008/ DRY_RUN=true -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 10 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 20:10:41 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 9: Verified-1 Build failed: https://jenkins.impala.io/job/gerrit-verify-dryrun/10004/ -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 9 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 19:33:03 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 9: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/14675/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 9 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 19:19:48 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 8: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/14674/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 8 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 19:19:32 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 9: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10004/ DRY_RUN=true -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 9 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 19:12:05 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 8: (19 comments) Thanks for the comments! http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-buffered-delete-sink.h@56 PS6, Line 56: /// Writes the buffered records to position delete files in the correct oreder. > We could add "after sorting" or "in the correct order". Done http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@72 PS5, Line 72: /// Nested iterator class to conveniently iterate over a FilePositions object. > I feel that it is cleaner to check it here - it is a precondition of this c Added DCHECK(!file_level_it_.second.empty()) checks as these are the most straightforward about what we want to ensure. Also added DCHECK(pos_level_it != file_level_it_.second.end()) to check that we haven't messed up anyting in NextPos()/NextFile(). http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h File be/src/exec/iceberg-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h@66 PS5, Line 66: /// having the same filepath + position), because that would corrupt the table data > I'd still consider adding a reference to the comment in IcebergUpdateImpl.j Added reference http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc File be/src/exec/iceberg-delete-sink.cc: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@79 PS5, Line 79: VerifyRowsNotDuplicated > Can we add DCHECKs for (prev_file_path_, filepath) and (prev_position_, pos file paths and positions are not sorted across partitions. So we would need to reset them whenever we create new partitions. Planner tests check the presence of the SORT node and sort expressions. I rather not modify code that is working now, also, I think we might retire IcebergDeleteSink anyway and always use IcebergBufferedDeleteWriter because I feel it should be much faster (need to run some measurements before), plus we wouldn't need to maintain two classes for the same functionality. We would lose spilling capabilities, but that might not needed anyway. http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@121 PS5, Line 121: RETURN_IF_ERROR(ConstructPartitionInfo(row, current_partition_.first.get())); > What is the reason for taking this out of InitOutputPartition()? This method has different signatures in the child classes (IcebergBufferedDeleteSink), so we cannot do a virtual call in InitOutputPartition() anymore. Child classes are now responsible creating the current partition object and pass it to InitOutputPartition. http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.h File be/src/exec/table-sink-base.h: http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.h@90 PS6, Line 90: The caller of this function must already > We don't have 'row' anymore. We could add that the partition key is now sup Updated the comment. http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.cc File be/src/exec/table-sink-base.cc: http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.cc@365 PS6, Line 365: OutputPartition > Is there a reason for using separate OutputPartition and vector parameters The vector only makes sense for the clustered writers where the vector denotes the values that belong to the current partition in the row batch. In IcebergBufferedDeleteSink the vector is not needed as the created row batches always have rows of a single partition. I felt that PartitionPair* is a bit awkward data structure, and it was a bit strange that WriteRowsToPartition() modifies its vector. http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@115 PS6, Line 115: If there are duplicates in the JOIN operator > Strictly speaking, we cannot check for duplicates even if there are none, s I'm not sure what is the point here. Duplicates are only possible in the context of JOINs. Otherwise a source statement like SELECT * FROM target_table WHERE ; should never duplicate rows AFAICT. But I happily rephrase the comment if you have a suggestion. http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdate
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#9). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, most importantly it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 IcebergBufferedDeleteSink cannot spill to disk, so it can only run if there's enough memory to store the delete records. Paths are stored only once, and the int64_t positions are stored in a vector, so updating 100 Million records per node should require around 800MBs + (100K) filepaths ~= 820 MBs of memory per node. Spilling could be added later, but currently the need for it is not too realistic. Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 35 files changed, 1,956 insertions(+), 344 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/9 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Bran
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#8). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, most importantly it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 IcebergBufferedDeleteSink cannot spill to disk, so it can only run if there's enough memory to store the delete records. Paths are stored only once, and the int64_t positions are stored in a vector, so updating 100 Million records per node should require around 800MBs + (100K) filepaths ~= 820 MBs of memory per node. Spilling could be added later, but currently the need for it is not too realistic. Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 35 files changed, 1,956 insertions(+), 344 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/8 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Bran
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Daniel Becker has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 7: (14 comments) Finally gone through all of the files. http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.h File be/src/exec/table-sink-base.h: http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.h@90 PS6, Line 90: The partition key is derived from 'row'. We don't have 'row' anymore. We could add that the partition key is now supposed to be available in 'output_partition' (if this is correct). http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.cc File be/src/exec/table-sink-base.cc: http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.cc@365 PS6, Line 365: OutputPartition Is there a reason for using separate OutputPartition and vector parameters instead of PartitionPair? And not clearing the vector at the end of this function? http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@115 PS6, Line 115: If there are duplicates in the JOIN operator Strictly speaking, we cannot check for duplicates even if there are none, so this clause is not needed, though it is useful to mention that the situation described below happens if there is a JOIN. http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@126 PS6, Line 126: via UPDATE FROM 'modifyStmt_.fromClause_.size() > 1' suggests if there is only one table (or view) in the FROM clause then it should be ok. Is that viable? http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java File fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java@101 PS6, Line 101: public TSortingOrder getSortingOrder() { Wouldn't it be better to not have 'getSortingOrder()' in DmlStatementBase but at a lower lever in the class hierarchy? Or would it make sense for OptimizeStmt too, it's just not implemented yet? http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34 PS6, Line 34: TableSink Would it make sense to have an IcebergDeleteSinkBase in the Java code too? Some functions and fields seem to be the same or very similar here and in IcebergDeleteSink. http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java File fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java: http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java@76 PS7, Line 76: 1024L * 1024L We could use ByteUnits.MEGABYTE, but see comment at IcebergBufferedDeleteSink.java:34. http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/Planner.java@926 PS7, Line 926: get 'partitionKeyExprs'? http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql File testdata/datasets/functional/functional_schema_template.sql: http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql@3407 PS7, Line 3407: INTO Shouldn't this be an INSERT OVERWRITE TABLE, like for example on L3469? If we re-load the table without first deleting it then it will add extra rows. http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql@3407 PS7, Line 3407: iceberg_partition_transforms_zorder Shouldn't this be '{table_name}'? http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test File testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test: http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test@340 PS7, Line 340: from iceberg_partition_transforms_zorder ice_zorder, iceberg_partitioned source Just curious: can we also use INNER JOIN syntax here? http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test File test
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 6: Verified-1 Build failed: https://jenkins.impala.io/job/gerrit-verify-dryrun/10002/ -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 6 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 15:58:54 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Daniel Becker has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 6: (8 comments) With this partial review I'm clear from the beginning up to iceberg-delete-sink.cc. http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-buffered-delete-sink.h@56 PS6, Line 56: /// Writes the buffered records to position delete files. We could add "after sorting" or "in the correct order". http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@72 PS5, Line 72: /// Nested iterator class to conveniently iterate over a FilePositions object. > Here this would need another iteration over the map (though only in debug m I feel that it is cleaner to check it here - it is a precondition of this class that there should be no empty vectors. An alternative to the extra loop would be to check that the vector is not empty whenever we call file_level_it_->second.begin(), i.e. on L75 and L103. We could either add DCHECK(!file_level_it_.second.empty()) before these calls or DCHECK(pos_level_it != file_level_it_.second.end()) afterwards. Or, even simpler, we could add DCHECK(pos_level_it != file_level_it_.second.end()) before L84 in Next(), i.e. before dereferencing 'pos_level_it_', with an error message about the emty vector. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h File be/src/exec/iceberg-delete-sink-base.h: http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@30 PS4, Line 30: IcebergDeleteSinkBase > I'm not sure what do you mean by that. It is currently an abstract class as Right, I missed it. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc File be/src/exec/iceberg-delete-sink-base.cc: http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@66 PS4, Line 66: TIcebergPartitionTransformType::type transform_type, const std::string& value, > I don't really find the comment misleading. Subclasses need to check it and Right, I somehow assumed that superclass methods are automatically called like in the case of destructors. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@112 PS4, Line 112: // non void partition names and transforms. > Yeah, currently it is checked twice when the first overload is being used, Ok, it's indeed a cheap check. http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h File be/src/exec/iceberg-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h@66 PS5, Line 66: /// having the same filepath + position), because that would corrupt the table data > That case is handled in IcebergUpdateImpl.java, so I added the comment ther I'd still consider adding a reference to the comment in IcebergUpdateImpl.java, for example "For a case where deduplication is not possible at the sink level, see the comment in IcebergUpdateImpl::buildAndValidateSelectExprs()". http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc File be/src/exec/iceberg-delete-sink.cc: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@79 PS5, Line 79: VerifyRowsNotDuplicated Can we add DCHECKs for (prev_file_path_, filepath) and (prev_position_, position) to check that the data is already sorted? http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@121 PS5, Line 121: RETURN_IF_ERROR(ConstructPartitionInfo(row, current_partition_.first.get())); What is the reason for taking this out of InitOutputPartition()? -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 6 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 14:15:48 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 7: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14669/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 7 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 13:16:20 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 7: (1 comment) PS7 is a rebase (apart from fixing a long line). http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-delete-sink-base.cc File be/src/exec/iceberg-delete-sink-base.cc: http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-delete-sink-base.cc@167 PS6, Line 167: if (i < partition_values_decoded.size() - 1) { > line too long (91 > 90) Done -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 7 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 12:43:57 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#7). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, most importantly it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 IcebergBufferedDeleteSink cannot spill to disk, so it can only run if there's enough memory to store the delete records. Paths are stored only once, and the int64_t positions are stored in a vector, so updating 100 Million records per node should require around 800MBs + (100K) filepaths ~= 820 MBs of memory per node. Spilling could be added later, but currently the need for it is not too realistic. Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 34 files changed, 1,882 insertions(+), 334 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/7 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I2bb97b4454165a292975d88dc9
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 6: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14665/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 6 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 11:55:00 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 6: (21 comments) Thanks for the comments! http://gerrit.cloudera.org:8080/#/c/20760/5//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20760/5//COMMIT_MSG@26 PS5, Line 26: FlushFinal > Can it spill to disk if the data it has received can't be stored in memory? It cannot spill, though it's not realistic to have that many delete records buffered. I've added a paragraph about it. http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@72 PS5, Line 72: /// Nested iterator class to conveniently iterate over a FilePositions object. > We could also DCHECK that none of the vectors in the map are empty either. Here this would need another iteration over the map (though only in debug mode). Instead of this I added a DCHECK to VerifyBufferedRecords(). http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@91 PS5, Line 91: /// that SortBufferedRecords() has been called already. > This condition should always be true, we've just dereferenced 'pos_level_it Done http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.h@35 PS4, Line 35: /// IcebergBufferedDeleteSink buffers the Iceberg position delete records in its > There could be a comment describing when this class is/should be used, e.g. Added comment. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@45 PS3, Line 45: > Does 'partition_descriptor_map_' exist? Updated the comments. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@48 PS3, Line 48: state, MemTracker* parent_m > I can't see that it has changed in this file, though it did change in icebe Yeah, sorry, updated the wrong comments. Apart from Prepare/Open/Send/FlushFinal/Close I didn't copy legacy comments. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@69 PS3, Line 69: /// It's necessary to use a std::map so we can get back the file paths in order. > The typedefs could stay here, the class could be forward-declared here and You're right, I didn't think of that. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.cc File be/src/exec/iceberg-buffered-delete-sink.cc: http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.cc@345 PS4, Line 345: int capacity = batch->capacity(); > See comment at iceberg-delete-sink-base.cc:66. Done http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h File be/src/exec/iceberg-delete-sink-base.h: http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@30 PS4, Line 30: IcebergDeleteSinkBase > Could we make this class pure virtual? I'm not sure what do you mean by that. It is currently an abstract class as it doesn't override everything from DataSink(), e.g. Send(). http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@63 PS4, Line 63: TIcebergPartitionTransformType::type transform_type, const std::string& value, > An example would be useful. Added comment. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@70 PS4, Line 70: > Does the unbuffered IcebergDeleteSink also need a separate DmlExecState? Is Moved it to IcebergBufferedDeleteSink. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc File be/src/exec/iceberg-delete-sink-base.cc: http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@66 PS4, Line 66: TIcebergPartitionTransformType::type transform_type, const std::string& value, > The 'closed_' field is a bit strange, AFAICS it comes from DataSink and the I don't really find the comment misleading. Subclasses need to check it and set it to false either directly or indirectly via SuperClass::Close(state); I rather not modify that comment in this CR. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@88 PS4, Line 88: ScalarExprEvaluator* spec_id_eval = partition_key_expr_evals_[0]; > Nit: doesn't it fit on the previous line? Done http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@100 PS4, Line 100: _ > Why is it plural? Maybe it could be 'parti
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#6). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, most importantly it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 IcebergBufferedDeleteSink cannot spill to disk, so it can only run if there's enough memory to store the delete records. Paths are stored only once, and the int64_t positions are stored in a vector, so updating 100 Million records per node should require around 800MBs + (100K) filepaths ~= 820 MBs of memory per node. Spilling could be added later, but currently the need for it is not too realistic. Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 34 files changed, 1,881 insertions(+), 334 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/6 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I2bb97b4454165a292975d88dc9
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 6: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10002/ DRY_RUN=true -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 6 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 11:29:47 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Daniel Becker has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 5: (7 comments) Follow-up for my first partial review. http://gerrit.cloudera.org:8080/#/c/20760/5//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20760/5//COMMIT_MSG@26 PS5, Line 26: FlushFinal Can it spill to disk if the data it has received can't be stored in memory? http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@72 PS5, Line 72: DCHECK(!file_pos.empty()); We could also DCHECK that none of the vectors in the map are empty either. http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@91 PS5, Line 91: if (pos_level_it_ != file_level_it_->second.end()) { This condition should always be true, we've just dereferenced 'pos_level_it_' on L84 before calling this function. We should DCHECK it instead. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@45 PS3, Line 45: partition_descriptor_map_ Does 'partition_descriptor_map_' exist? http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@48 PS3, Line 48: to the temporary Hdfs files > Sorry, this was copy-pasted, updated the comments. I can't see that it has changed in this file, though it did change in iceberg-delete-sink.h. Maybe those changes were intended here? Also, the other function descriptions that were copied here may contain legacy comments? http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@69 PS3, Line 69: class FilePositionsIterator { > This way I can keep the typedefs private. The typedefs could stay here, the class could be forward-declared here and defined in the .cc file. http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h File be/src/exec/iceberg-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h@66 PS5, Line 66: /// and the delete files as well. We could mention the case when the different deletes go to different sinks because of the differing new values and that we give an error in this case because we can't check it. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 5 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Tue, 12 Dec 2023 10:34:01 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Daniel Becker has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 5: (14 comments) Yet another partial review until be/src/exec/iceberg-delete-sink-base.cc http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.h@35 PS4, Line 35: class IcebergBufferedDeleteSink : public IcebergDeleteSinkBase { There could be a comment describing when this class is/should be used, e.g. what the difference between this and IcebergDeleteSink is. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.cc File be/src/exec/iceberg-buffered-delete-sink.cc: http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.cc@345 PS4, Line 345: void IcebergBufferedDeleteSink::Close(RuntimeState* state) { See comment at iceberg-delete-sink-base.cc:66. Possibly we could convert it into DCHECK(closed_). http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h File be/src/exec/iceberg-delete-sink-base.h: http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@30 PS4, Line 30: IcebergDeleteSinkBase Could we make this class pure virtual? http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@63 PS4, Line 63: std::string HumanReadablePartitionValue( An example would be useful. Could you explain what 'transform_result' is? How should it be interpreted and how should the return value be interpreted if 'transform_status' is an error? http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@70 PS4, Line 70: DmlExecState dml_exec_state_; Does the unbuffered IcebergDeleteSink also need a separate DmlExecState? Is that used for updates too? Or are updates always handled by the buffered IcebergBufferedDeleteSink? http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc File be/src/exec/iceberg-delete-sink-base.cc: http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@66 PS4, Line 66: if (closed_) return; The 'closed_' field is a bit strange, AFAICS it comes from DataSink and the comment there writes that "subclasses should check and set this in Close()." But in DataSink::Close() 'closed_' is set to true. Then some subclasses like TableSinkBase and this one don't set it while others like IcebergBufferedDeleteSink do. I think either the comment at 'DataSink::closed_' should be updated (if DataSink is responsible for setting 'closed_') OR we should have all concrete subclasses set it (and make intermediate classes that are never instantiated into pure virtual classes). http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@88 PS4, Line 88: OutputPartition* output_partition) { Nit: doesn't it fit on the previous line? http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@100 PS4, Line 100: s Why is it plural? Maybe it could be 'partition_values_eval', like the argument on L111. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@104 PS4, Line 104: s See L100. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@112 PS4, Line 112: if (partition_key_expr_evals_.empty()) { Optional: if this function is called from the other ConstructPartitionInfo() overload, this check has already been done. We could extract the rest of this function into a helper function that is called by both "top level" functions. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@113 PS4, Line 113: output_partition->iceberg_spec_id = table_desc_->IcebergSpecId(); Is there a valid use case for calling this function (and providing a 'spec_id') when 'partition_key_expr_evals_' is empty? Can 'spec_id' be different from 'table_desc_->IcebergSpecId()' If not, we should add a DCHECK; it they can be different, it's a bit unintuitive. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@159 PS4, Line 159: stringstream external_partition_name_ss; Is this ever written? http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@164 PS4, Line 164: encoded It could be 'url_encoded_...' - "encoded" is ambiguous, it could also refer to Base64 encoding in this context. http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@169 PS4, Line 169: string& Is it intentional that we overwrite the elements of 'partition_values_decoded' on L171? I think it's a bit less clean that having an independent variable here, though it may be faster.
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 5: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14643/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 5 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate Gerrit-Reviewer: Zoltan Borok-Nagy Gerrit-Comment-Date: Mon, 11 Dec 2023 16:07:07 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 5: (26 comments) Thanks for the comments! http://gerrit.cloudera.org:8080/#/c/20760/4//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20760/4//COMMIT_MSG@9 PS4, Line 9: most > nit: I would say "mainly" or "most importantly" Done http://gerrit.cloudera.org:8080/#/c/20760/4//COMMIT_MSG@23 PS4, Line 23: patch int > nit: just patch Done http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@48 PS3, Line 48: to the temporary Hdfs files > It can be misunderstood, one may thing that we write _into_ Hdfs files, lik Sorry, this was copy-pasted, updated the comments. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@69 PS3, Line 69: class FilePositionsIterator { > Can this be put into the .cc file? This way I can keep the typedefs private. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@74 PS3, Line 74: file_level_end_ = file_pos.end(); > Shouldn't we check whether file_pos is empty? Added DCHECK that file_pos is not empty. We should only invoke this if we have positions. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@78 PS3, Line 78: bool HasNext() { return file_level_it_ != file_level_end_; } > We should check for emptiness. At this point 'file_level_it_' should always point to a valid entry. I'll merge Get() and Next(), so the semantics will be clearer. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@83 PS3, Line 83: StringValue filepath = file_level_it_->first; > Does HasNext() mean we can call Get() or that there is a valid item after t I merged the Get() and Next(), so the semantics are clearer. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@96 PS3, Line 96: } > A comment on Next() could indicate that Next() shouldn't be called when ite Done http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@98 PS3, Line 98: > After incrementing 'file_level_it_' on the previous line, it may now point Done http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@121 PS3, Line 121: /// Sorts the buffered records. > We could include in the comment that it should be called after SortBuffered Done http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@151 PS3, Line 151: > Nit: serve? Done http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc File be/src/exec/iceberg-buffered-delete-sink.cc: http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@124 PS3, Line 124: DCHECK_NOTN > We could DCHECK that filepath_sv is not NULL. Done http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@149 PS3, Line 149: D_TI > Optional: to ease understanding, we could use actual types instead of auto In the for-loop we have std::pair<..., ...> entries, that might necessarily improve readability that much, I tried to use self-explanatory names. In the bodies where it really matters I changed the types. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@161 PS3, Line 161: gstr > See L149. I kept entry as is, but introduced a new variable 'PartitionInfo& part_info' in the body. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@184 PS3, Line 184: > See L149. See above. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@196 PS3, Line 196: v_pos); > We could use 'file' for 'file_and_pos.first' here and with the Len() too. Done http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@218 PS3, Line 218: i > Isn't it only one partition? Maybe 'partition_encoded' would be more unders Done http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@273 PS3, Line 273: // With partition evolution it's possible that we have the same partition names > How is this comment connected to the next line? Could you explain? Added additional sentence. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@288 PS3, Line 288: > Shouldn't it be '*buffer'? Good catch, thanks. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@320 PS3, Line 320: > Just curious, can filepath_ref->GetTupleIdx() be different from position_re It can't, as it comes from
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#5). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, most importantly it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 34 files changed, 1,882 insertions(+), 334 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/5 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 5 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Tamas Mate
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 4: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14625/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 4 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Daniel Becker Gerrit-Reviewer: Impala Public Jenkins Gerrit-Comment-Date: Fri, 08 Dec 2023 17:18:10 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Daniel Becker has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 3: (19 comments) Thanks Zoltán, great patch! So far I've only been able to review it up until iceberg-buffered-delete-sink.cc. http://gerrit.cloudera.org:8080/#/c/20760/2//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20760/2//COMMIT_MSG@26 PS2, Line 26: FlushFinal Can it spill to disk if the data it has received can't be stored in memory? http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@48 PS3, Line 48: to the temporary Hdfs files It can be misunderstood, one may thing that we write _into_ Hdfs files, like spilling to disk. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@69 PS3, Line 69: class FilePositionsIterator { Can this be put into the .cc file? http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@74 PS3, Line 74: pos_level_it_ = file_level_it_->second.begin(); Shouldn't we check whether file_pos is empty? http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@78 PS3, Line 78: StringValue filepath = file_level_it_->first; We should check for emptiness. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@83 PS3, Line 83: bool HasNext() { return file_level_it_ != file_level_end_; } Does HasNext() mean we can call Get() or that there is a valid item after the current one? In the first case is misunderstandable. If it is we have a valid item after the last one I think there's an off-by-one error: when we're currently at the last file position of the last file, both 'file_level_it_' and 'pos_level_it_' point to valid fields (HasNext() returns true) but there is no valid next element. See also L98. The semantics of Get(), Next() and HasNext() could be clarified. We could also eliminate Get() and have Next() return the next value. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@96 PS3, Line 96: DCHECK(file_level_it_ != file_level_end_); A comment on Next() could indicate that Next() shouldn't be called when iteration is over. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@98 PS3, Line 98: file_level_it_ After incrementing 'file_level_it_' on the previous line, it may now point past-the-end, so we can't dereference it. See also L83. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@121 PS3, Line 121: /// Verifies that there are no duplicates in the buffered delete records. We could include in the comment that it should be called after SortBufferedRecords(). http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@151 PS3, Line 151: server Nit: serve? http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc File be/src/exec/iceberg-buffered-delete-sink.cc: http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@124 PS3, Line 124: filepath_sv We could DCHECK that filepath_sv is not NULL. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@149 PS3, Line 149: auto Optional: to ease understanding, we could use actual types instead of auto in these loops. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@161 PS3, Line 161: auto See L149. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@184 PS3, Line 184: auto See L149. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@196 PS3, Line 196: file_and_pos.first We could use 'file' for 'file_and_pos.first' here and with the Len() too. http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@218 PS3, Line 218: s Isn't it only one partition? Maybe 'partition_encoded' would be more understandable? http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@273 PS3, Line 273: // we might need to delete rows from partition "col_trunc=1000" with both spec ids. How is this comment connected to the next line? Could you explain? http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@288 PS3, Line 288: buffer Shouldn't it be '*buffer'? http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@320 PS3, Line 320: int64_t* pos_slot = row->GetTuple(position_ref->GetTupleIdx())-> Just curious, can filepath_ref->GetTupleIdx() be different from posit
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#4). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, i.e. it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch set introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 34 files changed, 1,869 insertions(+), 327 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/4 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 4 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Impala Public Jenkins
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 3: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14614/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 3 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Impala Public Jenkins Gerrit-Comment-Date: Thu, 07 Dec 2023 15:36:21 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#3). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, i.e. it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch set introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. TODO: * planner tests * e2e tests * concurrent tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test 29 files changed, 1,168 insertions(+), 323 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/3 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 3 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Impala Public Jenkins
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 2: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14601/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 2 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Impala Public Jenkins Gerrit-Comment-Date: Wed, 06 Dec 2023 18:05:03 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Hello Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#2). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, i.e. it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch set introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. TODO: * planner tests * e2e tests * concurrent tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java 26 files changed, 1,122 insertions(+), 251 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/2 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 2 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Impala Public Jenkins
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 ) Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. Patch Set 1: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14599/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 1 Gerrit-Owner: Zoltan Borok-Nagy Gerrit-Reviewer: Impala Public Jenkins Gerrit-Comment-Date: Wed, 06 Dec 2023 15:05:03 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
Zoltan Borok-Nagy has uploaded this change for review. ( http://gerrit.cloudera.org:8080/20760 Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. .. IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, i.e. it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch set introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. TODO: * planner tests * e2e tests * concurrent tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java 26 files changed, 1,123 insertions(+), 251 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/1 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 1 Gerrit-Owner: Zoltan Borok-Nagy