Hello Andrew Sherman, Tamas Mate, Daniel Becker, Gabor Kaszab, Noemi 
Pap-Takacs, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/20677

to look at the new patch set (#5).

Change subject: IMPALA-12313: (part 2) Limited UPDATE support for Iceberg tables
......................................................................

IMPALA-12313: (part 2) Limited UPDATE support for Iceberg tables

This patch adds limited UPDATE support for Iceberg tables. The
limitations mean users cannot update Iceberg tables if any of
the following is true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The above limitations will be resolved by part 3. The usual limitations
like writing non-Parquet files, using copy-on-write, modifying V1 tables
are out of scope of IMPALA-12313.

This patch implements UPDATEs with the merge-on-read technique. This
means the UPDATE statement writes both data files and delete files.
Data files contain the updated records, delete files contain the
position delete records of the old data records that have been
touched.

To achieve the above this patch introduces a new sink: MultiDataSink.
We can configure multiple TableSinks for a single MultiDataSink object.
During execution, the row batches sent to the MultiDataSink will be
forwarded to all the TableSinks that have been registered.

The UPDATE statement for an Iceberg table creates a source select
statement with all table columns and virtual columns INPUT__FILE__NAME
and FILE__POSITION. E.g. imagine we have a table 'tbl' with schema
(i int, s string, k int), and we update the table with:

  UPDATE tbl SET k = 5 WHERE i % 100 = 11;

 The generated source statement will be ==>

  SELECT i, s, 5, INPUT__FILE__NAME, FILE__POSITION
  FROM tbl WHERE i % 100 = 11;

Then we create two table sinks that refer to expressions from the above
source statement:

  Insert sink (i, s, 5)
  Delete sink (INPUT__FILE__NAME, FILE__POSITION)

The tuples in the rowbatch of MultiDataSink contain slots for all the
above expressions (i, s, 5, INPUT__FILE__NAME, FILE__POSITION).
MultiDataSink forwards each row batch to each registered TableSink.
They will pick their relevant expressions from the tuple and write
data/delete files. The tuples are sorted by INPUTE__FILE__NAME and
FILE__POSITION because we need to write the delete records in this
order.

For partitioned tables we need to shuffle and sort the input tuples.
In this case we also add virtual columns "PARTITION__SPEC__ID" and
"ICEBERG__PARTITION__SERIALIZED" to the source statement and shuffle
and sort the rows based on them.

Data files and delete files are now separated in the DmlExecState, so
at the end of the operation we'll have two sets of files. We use these
two sets to create a new Iceberg snapshot.

Why does this patch has the limitations?
 - Because we are shuffling and sorting rows based on the delete
   records and their partitions. This means that the new data files
   might not get written in an efficient way, e.g. there will be
   too many of them, or we will need to keep too many open file
   handles during writing.
   Also, if the table has SORT BY properties, we cannot respect
   it as the input rows are ordered in a way to favor the position
   deletes.
   Patch 3 will introduce a buffering writer for position delete
   files. This means we will shuffle and sort records based on
   the data records' partitions and SORT BY properties while
   delete records get buffered and written out at the end (sorted
   by file_path and position). In some edge cases the delete records
   might not get written efficiently, but it is a smaller problem
   then inefficient data files.

Testing:
 * negative tests
 * planner tests
 * update all supported data types
 * partitioned tables
 * Impala/Hive interop tests
 * authz tests
 * concurrent tests

Change-Id: Iff0ef6075a2b6ebe130d15daa389ac1a505a7a08
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
A be/src/exec/multi-table-sink.cc
A be/src/exec/multi-table-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/runtime/dml-exec-state.cc
M be/src/runtime/dml-exec-state.h
M be/src/service/client-request-state.cc
M common/protobuf/control_service.proto
M common/thrift/DataSinks.thrift
M common/thrift/ImpalaService.thrift
M common/thrift/Types.thrift
M fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
M fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
M fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
A fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
M fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
M fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
A fe/src/main/java/org/apache/impala/planner/MultiDataSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
M fe/src/main/java/org/apache/impala/util/ExprUtil.java
M fe/src/main/java/org/apache/impala/util/IcebergUtil.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M shell/impala_client.py
M shell/impala_shell.py
M testdata/datasets/functional/functional_schema_template.sql
A 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
M 
testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
M 
testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test
M tests/authorization/test_ranger.py
M tests/query_test/test_iceberg.py
A tests/stress/test_update_stress.py
48 files changed, 2,302 insertions(+), 381 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/77/20677/5
--
To view, visit http://gerrit.cloudera.org:8080/20677
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Iff0ef6075a2b6ebe130d15daa389ac1a505a7a08
Gerrit-Change-Number: 20677
Gerrit-PatchSet: 5
Gerrit-Owner: Zoltan Borok-Nagy <borokna...@cloudera.com>
Gerrit-Reviewer: Andrew Sherman <asher...@cloudera.com>
Gerrit-Reviewer: Daniel Becker <daniel.bec...@cloudera.com>
Gerrit-Reviewer: Gabor Kaszab <gaborkas...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Noemi Pap-Takacs <npaptak...@cloudera.com>
Gerrit-Reviewer: Tamas Mate <tma...@apache.org>
Gerrit-Reviewer: Zoltan Borok-Nagy <borokna...@cloudera.com>

Reply via email to