Hello Andrew Sherman, Tamas Mate, Daniel Becker, Gabor Kaszab, Noemi Pap-Takacs, Impala Public Jenkins,
I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20677 to look at the new patch set (#10). Change subject: IMPALA-12313: (part 2) Limited UPDATE support for Iceberg tables ...................................................................... IMPALA-12313: (part 2) Limited UPDATE support for Iceberg tables This patch adds limited UPDATE support for Iceberg tables. The limitations mean users cannot update Iceberg tables if any of the following is true: * UPDATE value of partitioning column * UPDATE table that went through partition evolution * Table has SORT BY properties The above limitations will be resolved by part 3. The usual limitations like writing non-Parquet files, using copy-on-write, modifying V1 tables are out of scope of IMPALA-12313. This patch implements UPDATEs with the merge-on-read technique. This means the UPDATE statement writes both data files and delete files. Data files contain the updated records, delete files contain the position delete records of the old data records that have been touched. To achieve the above this patch introduces a new sink: MultiDataSink. We can configure multiple TableSinks for a single MultiDataSink object. During execution, the row batches sent to the MultiDataSink will be forwarded to all the TableSinks that have been registered. The UPDATE statement for an Iceberg table creates a source select statement with all table columns and virtual columns INPUT__FILE__NAME and FILE__POSITION. E.g. imagine we have a table 'tbl' with schema (i int, s string, k int), and we update the table with: UPDATE tbl SET k = 5 WHERE i % 100 = 11; The generated source statement will be ==> SELECT i, s, 5, INPUT__FILE__NAME, FILE__POSITION FROM tbl WHERE i % 100 = 11; Then we create two table sinks that refer to expressions from the above source statement: Insert sink (i, s, 5) Delete sink (INPUT__FILE__NAME, FILE__POSITION) The tuples in the rowbatch of MultiDataSink contain slots for all the above expressions (i, s, 5, INPUT__FILE__NAME, FILE__POSITION). MultiDataSink forwards each row batch to each registered TableSink. They will pick their relevant expressions from the tuple and write data/delete files. The tuples are sorted by INPUTE__FILE__NAME and FILE__POSITION because we need to write the delete records in this order. For partitioned tables we need to shuffle and sort the input tuples. In this case we also add virtual columns "PARTITION__SPEC__ID" and "ICEBERG__PARTITION__SERIALIZED" to the source statement and shuffle and sort the rows based on them. Data files and delete files are now separated in the DmlExecState, so at the end of the operation we'll have two sets of files. We use these two sets to create a new Iceberg snapshot. Why does this patch have the limitations? - Because we are shuffling and sorting rows based on the delete records and their partitions. This means that the new data files might not get written in an efficient way, e.g. there will be too many of them, or we will need to keep too many open file handles during writing. Also, if the table has SORT BY properties, we cannot respect it as the input rows are ordered in a way to favor the position deletes. Part 3 will introduce a buffering writer for position delete files. This means we will shuffle and sort records based on the data records' partitions and SORT BY properties while delete records get buffered and written out at the end (sorted by file_path and position). In some edge cases the delete records might not get written efficiently, but it is a smaller problem then inefficient data files. Testing: * negative tests * planner tests * update all supported data types * partitioned tables * Impala/Hive interop tests * authz tests * concurrent tests Change-Id: Iff0ef6075a2b6ebe130d15daa389ac1a505a7a08 --- M be/src/exec/CMakeLists.txt M be/src/exec/data-sink.cc M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h A be/src/exec/multi-table-sink.cc A be/src/exec/multi-table-sink.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/runtime/dml-exec-state.cc M be/src/runtime/dml-exec-state.h M be/src/service/client-request-state.cc M common/protobuf/control_service.proto M common/thrift/DataSinks.thrift M common/thrift/ImpalaService.thrift M common/thrift/Types.thrift M fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java M fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java M fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java A fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java M fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java A fe/src/main/java/org/apache/impala/planner/MultiDataSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/service/Frontend.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/java/org/apache/impala/util/ExprUtil.java M fe/src/main/java/org/apache/impala/util/IcebergUtil.java M fe/src/test/java/org/apache/impala/planner/PlannerTest.java M shell/impala_client.py M shell/impala_shell.py M testdata/datasets/functional/functional_schema_template.sql A testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test M testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test M testdata/workloads/functional-planner/queries/PlannerTest/insert.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test M testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test M testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test M tests/authorization/test_ranger.py M tests/query_test/test_iceberg.py A tests/stress/test_update_stress.py 50 files changed, 2,298 insertions(+), 423 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/77/20677/10 -- To view, visit http://gerrit.cloudera.org:8080/20677 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: Iff0ef6075a2b6ebe130d15daa389ac1a505a7a08 Gerrit-Change-Number: 20677 Gerrit-PatchSet: 10 Gerrit-Owner: Zoltan Borok-Nagy <borokna...@cloudera.com> Gerrit-Reviewer: Andrew Sherman <asher...@cloudera.com> Gerrit-Reviewer: Daniel Becker <daniel.bec...@cloudera.com> Gerrit-Reviewer: Gabor Kaszab <gaborkas...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Noemi Pap-Takacs <npaptak...@cloudera.com> Gerrit-Reviewer: Tamas Mate <tma...@apache.org> Gerrit-Reviewer: Zoltan Borok-Nagy <borokna...@cloudera.com>