Hello Tamas Mate, Daniel Becker, Impala Public Jenkins, I'd like you to reexamine a change. Please visit
http://gerrit.cloudera.org:8080/20760 to look at the new patch set (#6). Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. ...................................................................... IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. Part 2 had some limitations, most importantly it could not update Iceberg tables if any of the following were true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The problem with partitions is that the delete record and new data record might belong to different partitions and records are shuffled across based on the partitions of the delete records, hence the data files might not get written efficiently. The problem with SORT BY properties, is that we need to write the position delete files ordered by (file_path, position). To address the above problems, this patch introduces a new backend operator: IcebergBufferedDeleteSink. This new operator extracts and aggregates the delete record information from the incoming row batches, then in FlushFinal it orders the position delete records and writes them out to files. This mechanism is similar to Hive's approach: https://github.com/apache/hive/pull/3251 IcebergBufferedDeleteSink cannot spill to disk, so it can only run if there's enough memory to store the delete records. Paths are stored only once, and the int64_t positions are stored in a vector, so updating 100 Million records per node should require around 800MBs + (100K) filepaths ~= 820 MBs of memory per node. Spilling could be added later, but currently the need for it is not too realistic. Now records can get shuffled around based on the new data records' partition values, and the SORT operator sorts the records based on the SORT BY properties. There's only one case we don't allow the UPDATE statement: * UPDATE partition column AND * Right-hand side of assignment is non-constant expression AND * UPDATE statement has a JOIN When all of the above conditions meet, it would be possible to have an incorrect JOIN condition that has multiple matches for the data records, then the duplicated records would be shuffled independently (based on the new partition value) to different backend SINKs, and the different backend SINK would not be able to detect the duplicates. If any of the above conditions was false, then the duplicated records would be shuffled together to the same SINK, that could do the duplicate check. This patch also moves some code from IcebergDeleteSink to the newly introduced IcebergDeleteSinkBase. Testing: * planner tests * e2e tests * Impala/Hive interop tests Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h A be/src/exec/iceberg-buffered-delete-sink.cc A be/src/exec/iceberg-buffered-delete-sink.h A be/src/exec/iceberg-delete-sink-base.cc A be/src/exec/iceberg-delete-sink-base.h A be/src/exec/iceberg-delete-sink-config.cc A be/src/exec/iceberg-delete-sink-config.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/exprs/slot-ref.h M be/src/runtime/dml-exec-state.h M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M testdata/datasets/functional/functional_schema_template.sql M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test M tests/query_test/test_iceberg.py M tests/stress/test_update_stress.py 34 files changed, 1,881 insertions(+), 334 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/6 -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 6 Gerrit-Owner: Zoltan Borok-Nagy <borokna...@cloudera.com> Gerrit-Reviewer: Daniel Becker <daniel.bec...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Tamas Mate <tma...@apache.org> Gerrit-Reviewer: Zoltan Borok-Nagy <borokna...@cloudera.com>