Hello Tamas Mate, Daniel Becker, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#12).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
......................................................................

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

IcebergBufferedDeleteSink cannot spill to disk, so it can only
run if there's enough memory to store the delete records. Paths
are stored only once, and the int64_t positions are stored in
a vector, so updating 100 Million records per node should require
around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
Spilling could be added later, but currently the need for it is not
too realistic.

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M 
testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
35 files changed, 1,951 insertions(+), 349 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/12
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 12
Gerrit-Owner: Zoltan Borok-Nagy <borokna...@cloudera.com>
Gerrit-Reviewer: Daniel Becker <daniel.bec...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Tamas Mate <tma...@apache.org>
Gerrit-Reviewer: Zoltan Borok-Nagy <borokna...@cloudera.com>

Reply via email to