[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-18 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 13: Verified+1


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 13
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Mon, 18 Dec 2023 19:14:20 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-18 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has submitted this change and it was merged. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

IcebergBufferedDeleteSink cannot spill to disk, so it can only
run if there's enough memory to store the delete records. Paths
are stored only once, and the int64_t positions are stored in
a vector, so updating 100 Million records per node should require
around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
Spilling could be added later, but currently the need for it is not
too realistic.

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Reviewed-on: http://gerrit.cloudera.org:8080/20760
Reviewed-by: Impala Public Jenkins 
Tested-by: Impala Public Jenkins 
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M 
testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
35 files changed, 1,951 insertions(+), 349 deletions(-)

Approvals:
  Impala Public Jenkins: Looks good to me, approved; Verified

--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/setting

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-18 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 13:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10063/ 
DRY_RUN=false


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 13
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Mon, 18 Dec 2023 14:36:03 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-18 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 13: Code-Review+2


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 13
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Mon, 18 Dec 2023 14:36:02 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-15 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 12:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14742/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 12
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Fri, 15 Dec 2023 16:18:28 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-15 Thread Zoltan Borok-Nagy (Code Review)
Zoltan Borok-Nagy has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 12:

(8 comments)

Thanks for the comments!

http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h@28
PS11, Line 28:
> How come StringVal has to be redefined here?
At some point I had to declare it, but you're right, now it is redundant.


http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h@138
PS11, Line 138:
> nit: empty space
Done


http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-delete-sink-base.h
File be/src/exec/iceberg-delete-sink-base.h:

http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-delete-sink-base.h@49
PS11, Line 49:   Status ConstructPartitionInfo(const TupleRow* row, 
OutputPartition* output_partition);
 :   Status ConstructPartitionInfo(int32_t spec_id, const 
std::string& partitions,
 :   OutputPartition* output_partition);
> nit: this should fit into one line
Done


http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java:

http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@20
PS11, Line 20: import java.util.ArrayList;
> unused import
Done


http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@30
PS11, Line 30: import org.apache.impala.common.AnalysisExc
> unused import
Done


http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@37
PS11, Line 37: import org.apache.impala.thrift.TSortingOrder;
> unused import
Done


http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@44
PS11, Line 44:   // Id of the delete table in the descriptor t
> unused import
Done


http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java:

http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34
PS11, Line 34: IcebergBufferedDeleteSink
> For me it looks like that ~40% of this class is identical with IcebergDelet
Yeah, actually I want to remove IcebergDeleteSink in the future: IMPALA-12640

Keeping them separate will make the removal easier.



--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 12
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Fri, 15 Dec 2023 15:51:30 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-15 Thread Zoltan Borok-Nagy (Code Review)
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#12).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

IcebergBufferedDeleteSink cannot spill to disk, so it can only
run if there's enough memory to store the delete records. Paths
are stored only once, and the int64_t positions are stored in
a vector, so updating 100 Million records per node should require
around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
Spilling could be added later, but currently the need for it is not
too realistic.

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M 
testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
35 files changed, 1,951 insertions(+), 349 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/12
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Br

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-15 Thread Tamas Mate (Code Review)
Tamas Mate has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 11:

(8 comments)

Looks great Zoltan, just a few minor comments.

http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h@28
PS11, Line 28: struct StringVal;
How come StringVal has to be redefined here?


http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-buffered-delete-sink.h@138
PS11, Line 138:
nit: empty space


http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-delete-sink-base.h
File be/src/exec/iceberg-delete-sink-base.h:

http://gerrit.cloudera.org:8080/#/c/20760/11/be/src/exec/iceberg-delete-sink-base.h@49
PS11, Line 49:   Status ConstructPartitionInfo(
 :   const TupleRow* row,
 :   OutputPartition* output_partition);
nit: this should fit into one line


http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java:

http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@20
PS11, Line 20: import static java.lang.String.format;
unused import


http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@30
PS11, Line 30: import org.apache.impala.catalog.FeFsTable;
unused import


http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@37
PS11, Line 37: import org.apache.impala.planner.IcebergDeleteSink;
unused import


http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@44
PS11, Line 44: import com.google.common.collect.ImmutableList;
unused import


http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java:

http://gerrit.cloudera.org:8080/#/c/20760/11/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34
PS11, Line 34: IcebergBufferedDeleteSink
For me it looks like that ~40% of this class is identical with 
IcebergDeleteSink. Do you think it would worth inheriting from 
IcebergDeleteSink class instead of TableSink?



--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 11
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Fri, 15 Dec 2023 14:27:50 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-15 Thread Zoltan Borok-Nagy (Code Review)
Zoltan Borok-Nagy has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 6:

(1 comment)

Thanks for the review, Daniel!

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java:

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34
PS6, Line 34: TableSink
> Could you open a Jira for this?
Opened IMPALA-12640.



--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 6
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Fri, 15 Dec 2023 12:56:17 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-15 Thread Daniel Becker (Code Review)
Daniel Becker has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 11: Code-Review+1

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java:

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34
PS6, Line 34: TableSink
> Ok, if IcebergDeleteSink will probably be deleted we can leave it as it is
Could you open a Jira for this?



--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 11
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Fri, 15 Dec 2023 12:46:37 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-13 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 11: Verified+1


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 11
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Wed, 13 Dec 2023 20:22:46 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-13 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 11:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14706/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 11
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Wed, 13 Dec 2023 15:36:09 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-13 Thread Zoltan Borok-Nagy (Code Review)
Zoltan Borok-Nagy has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 11:

(5 comments)

Thanks for the comments!

http://gerrit.cloudera.org:8080/#/c/20760/10/be/src/exec/table-sink-base.h
File be/src/exec/table-sink-base.h:

http://gerrit.cloudera.org:8080/#/c/20760/10/be/src/exec/table-sink-base.h@90
PS10, Line 90: must already have
> Nit: "must already have filled".
Done


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java:

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@115
PS6, Line 115: In case of a JOIN, and if duplicated rows ar
> It is a bit nit-picky, I meant that in the sentence "If there are duplicate
Updated the comment.


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@126
PS6, Line 126: se_.size() > 1)
> I wanted to ask if it is possible that modifyStmt_.fromClause_.size() == 1.
Even 'UPDATE tbl SET val = 3;' has a fromClause_ (maybe the null checking is 
redundant, but I think it should be fine), and have a single tableRef which is 
for the target table 'tbl'.
Updated the error message.


http://gerrit.cloudera.org:8080/#/c/20760/10/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
File 
testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test:

http://gerrit.cloudera.org:8080/#/c/20760/10/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test@400
PS10, Line 400: 1
> Are these changes compared to PS7 because of a rebase?
No, this is because of the new INSERT INTO in functional_schema_template.sql.


http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
File 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test:

http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test@252
PS7, Line 252: FROM clause
> I asked because I'm unsure whether we should add "multiple tables" to the c
Done



--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 11
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Wed, 13 Dec 2023 15:14:22 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-13 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 11:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10025/ 
DRY_RUN=true


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 11
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Wed, 13 Dec 2023 15:14:45 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-13 Thread Zoltan Borok-Nagy (Code Review)
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#11).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

IcebergBufferedDeleteSink cannot spill to disk, so it can only
run if there's enough memory to store the delete records. Paths
are stored only once, and the int64_t positions are stored in
a vector, so updating 100 Million records per node should require
around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
Spilling could be added later, but currently the need for it is not
too realistic.

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M 
testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
35 files changed, 1,960 insertions(+), 345 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/11
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Br

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-13 Thread Daniel Becker (Code Review)
Daniel Becker has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 10:

(9 comments)

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc
File be/src/exec/iceberg-delete-sink.cc:

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@79
PS5, Line 79: VerifyRowsNotDuplicated
> file paths and positions are not sorted across partitions. So we would need
Ok, it can stay as it is.


http://gerrit.cloudera.org:8080/#/c/20760/10/be/src/exec/table-sink-base.h
File be/src/exec/table-sink-base.h:

http://gerrit.cloudera.org:8080/#/c/20760/10/be/src/exec/table-sink-base.h@90
PS10, Line 90: must already fill
Nit: "must already have filled".


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java:

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@115
PS6, Line 115: If there are duplicates in the JOIN operator
> I'm not sure what is the point here. Duplicates are only possible in the co
It is a bit nit-picky, I meant that in the sentence "If there are duplicates 
[...] then we cannot do duplicate checking in the SINK if ..." the condition at 
the beginning is not necessary - if it happens that there are actually no 
duplicates we still can't check for them if the rows are shuffled 
independently. I'd suggest something like this:

"""
In case of a JOIN, if duplicated rows can be shuffled independently, we cannot 
do duplicate checking in the SINK.
This is the case when ...
"""


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@126
PS6, Line 126: via UPDATE FROM
> There will be always at least one tableRef because of the target table.
I wanted to ask if it is possible that modifyStmt_.fromClause_.size() == 1.

1. If it is possible, then in that case the exception (currently) won't be 
thrown.
 1a) If it should be thrown we should remove that condition.
 1b) Otherwise, the error message lists the conditions that were needed to 
trigger the error:
  - partition column,
  - non-constant RHS
  -> in this case we should include "more than one table ref in the FROM 
clause" as well

2. If modifyStmt_.fromClause_.size() == 1 is not possible, we should remove the 
relevant part of the condition on L123 and add a precondition check instead.


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
File fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java:

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java@101
PS6, Line 101:   public TSortingOrder getSortingOrder() {
> There's good chance we will need it later, e.g. optimizing a table that has
Ok, it should stay then.


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java:

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34
PS6, Line 34: TableSink
> It may have some value now, as there are some common fields/methods, but I'
Ok, if IcebergDeleteSink will probably be deleted we can leave it as it is now. 
But we should open a Jira about it then.


http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql
File testdata/datasets/functional/functional_schema_template.sql:

http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql@3407
PS7, Line 3407: E TA
> Makes sense, I never really thought about this as I usually re-load my tabl
I agree, let's not make this patch even bigger.


http://gerrit.cloudera.org:8080/#/c/20760/10/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
File 
testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test:

http://gerrit.cloudera.org:8080/#/c/20760/10/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test@400
PS10, Line 400: 1
Are these changes compared to PS7 because of a rebase?


http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
File 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test:

http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test@252
PS7, Line 252: FROM clause
> I think yes, otherwise you cannot have a join that produces duplicates.
I asked because I'm unsure whether we should add "mult

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 10: Verified+1


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 10
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Wed, 13 Dec 2023 01:08:43 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 10:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14679/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 10
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 20:37:55 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Zoltan Borok-Nagy (Code Review)
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#10).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

IcebergBufferedDeleteSink cannot spill to disk, so it can only
run if there's enough memory to store the delete records. Paths
are stored only once, and the int64_t positions are stored in
a vector, so updating 100 Million records per node should require
around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
Spilling could be added later, but currently the need for it is not
too realistic.

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M 
testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
35 files changed, 1,956 insertions(+), 344 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/10
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Br

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 10:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10008/ 
DRY_RUN=true


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 10
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 20:10:41 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 9: Verified-1

Build failed: https://jenkins.impala.io/job/gerrit-verify-dryrun/10004/


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 9
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 19:33:03 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 9:

Build Failed

https://jenkins.impala.io/job/gerrit-code-review-checks/14675/ : Initial code 
review checks failed. See linked job for details on the failure.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 9
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 19:19:48 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 8:

Build Failed

https://jenkins.impala.io/job/gerrit-code-review-checks/14674/ : Initial code 
review checks failed. See linked job for details on the failure.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 8
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 19:19:32 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 9:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10004/ 
DRY_RUN=true


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 9
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 19:12:05 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Zoltan Borok-Nagy (Code Review)
Zoltan Borok-Nagy has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 8:

(19 comments)

Thanks for the comments!

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-buffered-delete-sink.h@56
PS6, Line 56:   /// Writes the buffered records to position delete files in the 
correct oreder.
> We could add "after sorting" or "in the correct order".
Done


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@72
PS5, Line 72:   /// Nested iterator class to conveniently iterate over a 
FilePositions object.
> I feel that it is cleaner to check it here - it is a precondition of this c
Added DCHECK(!file_level_it_.second.empty()) checks as these are the most 
straightforward about what we want to ensure.

Also added DCHECK(pos_level_it != file_level_it_.second.end()) to check that we 
haven't messed up anyting in NextPos()/NextFile().


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h
File be/src/exec/iceberg-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h@66
PS5, Line 66:   /// having the same filepath + position), because that would 
corrupt the table data
> I'd still consider adding a reference to the comment in IcebergUpdateImpl.j
Added reference


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc
File be/src/exec/iceberg-delete-sink.cc:

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@79
PS5, Line 79: VerifyRowsNotDuplicated
> Can we add DCHECKs for (prev_file_path_, filepath) and (prev_position_, pos
file paths and positions are not sorted across partitions. So we would need to 
reset them whenever we create new partitions. Planner tests check the presence 
of the SORT node and sort expressions.
I rather not modify code that is working now, also, I think we might retire 
IcebergDeleteSink anyway and always use IcebergBufferedDeleteWriter because I 
feel it should be much faster (need to run some measurements before), plus we 
wouldn't need to maintain two classes for the same functionality.
We would lose spilling capabilities, but that might not needed anyway.


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@121
PS5, Line 121:   RETURN_IF_ERROR(ConstructPartitionInfo(row, 
current_partition_.first.get()));
> What is the reason for taking this out of InitOutputPartition()?
This method has different signatures in the child classes 
(IcebergBufferedDeleteSink), so we cannot do a virtual call in 
InitOutputPartition() anymore.
Child classes are now responsible creating the current partition object and 
pass it to InitOutputPartition.


http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.h
File be/src/exec/table-sink-base.h:

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.h@90
PS6, Line 90: The caller of this function must already
> We don't have 'row' anymore. We could add that the partition key is now sup
Updated the comment.


http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.cc
File be/src/exec/table-sink-base.cc:

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.cc@365
PS6, Line 365: OutputPartition
> Is there a reason for using separate OutputPartition and vector parameters
The vector only makes sense for the clustered writers where the vector denotes 
the values that belong to the current partition in the row batch.
In IcebergBufferedDeleteSink the vector is not needed as the created row 
batches always have rows of a single partition.
I felt that PartitionPair* is a bit awkward data structure, and it was a bit 
strange that WriteRowsToPartition() modifies its vector.


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java:

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@115
PS6, Line 115: If there are duplicates in the JOIN operator
> Strictly speaking, we cannot check for duplicates even if there are none, s
I'm not sure what is the point here. Duplicates are only possible in the 
context of JOINs.
Otherwise a source statement like SELECT * FROM target_table WHERE ; 
should never duplicate rows AFAICT.
But I happily rephrase the comment if you have a suggestion.


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdate

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Zoltan Borok-Nagy (Code Review)
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#9).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

IcebergBufferedDeleteSink cannot spill to disk, so it can only
run if there's enough memory to store the delete records. Paths
are stored only once, and the int64_t positions are stored in
a vector, so updating 100 Million records per node should require
around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
Spilling could be added later, but currently the need for it is not
too realistic.

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M 
testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
35 files changed, 1,956 insertions(+), 344 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/9
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Bran

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Zoltan Borok-Nagy (Code Review)
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#8).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

IcebergBufferedDeleteSink cannot spill to disk, so it can only
run if there's enough memory to store the delete records. Paths
are stored only once, and the int64_t positions are stored in
a vector, so updating 100 Million records per node should require
around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
Spilling could be added later, but currently the need for it is not
too realistic.

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M 
testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
35 files changed, 1,956 insertions(+), 344 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/8
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Bran

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Daniel Becker (Code Review)
Daniel Becker has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 7:

(14 comments)

Finally gone through all of the files.

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.h
File be/src/exec/table-sink-base.h:

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.h@90
PS6, Line 90: The partition key is derived from 'row'.
We don't have 'row' anymore. We could add that the partition key is now 
supposed to be available in 'output_partition' (if this is correct).


http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.cc
File be/src/exec/table-sink-base.cc:

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.cc@365
PS6, Line 365: OutputPartition
Is there a reason for using separate OutputPartition and vector parameters 
instead of PartitionPair? And not clearing the vector at the end of this 
function?


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java:

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@115
PS6, Line 115: If there are duplicates in the JOIN operator
Strictly speaking, we cannot check for duplicates even if there are none, so 
this clause is not needed, though it is useful to mention that the situation 
described below happens if there is a JOIN.


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@126
PS6, Line 126: via UPDATE FROM
'modifyStmt_.fromClause_.size() > 1' suggests if there is only one table (or 
view) in the FROM clause then it should be ok. Is that viable?


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
File fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java:

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java@101
PS6, Line 101:   public TSortingOrder getSortingOrder() {
Wouldn't it be better to not have 'getSortingOrder()' in DmlStatementBase but 
at a lower lever in the class hierarchy? Or would it make sense for 
OptimizeStmt too, it's just not implemented yet?


http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java:

http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34
PS6, Line 34: TableSink
Would it make sense to have an IcebergDeleteSinkBase in the Java code too? Some 
functions and fields seem to be the same or very similar here and in 
IcebergDeleteSink.


http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
File fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java:

http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java@76
PS7, Line 76: 1024L * 1024L
We could use ByteUnits.MEGABYTE, but see comment at 
IcebergBufferedDeleteSink.java:34.


http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/Planner.java
File fe/src/main/java/org/apache/impala/planner/Planner.java:

http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/Planner.java@926
PS7, Line 926: get
'partitionKeyExprs'?


http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql
File testdata/datasets/functional/functional_schema_template.sql:

http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql@3407
PS7, Line 3407: INTO
Shouldn't this be an INSERT OVERWRITE TABLE, like for example on L3469? If we 
re-load the table without first deleting it then it will add extra rows.


http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql@3407
PS7, Line 3407: iceberg_partition_transforms_zorder
Shouldn't this be '{table_name}'?


http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
File 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test:

http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test@340
PS7, Line 340: from iceberg_partition_transforms_zorder ice_zorder, 
iceberg_partitioned source
Just curious: can we also use INNER JOIN syntax here?


http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
File 
test

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 6: Verified-1

Build failed: https://jenkins.impala.io/job/gerrit-verify-dryrun/10002/


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 6
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 15:58:54 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Daniel Becker (Code Review)
Daniel Becker has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 6:

(8 comments)

With this partial review I'm clear from the beginning up to 
iceberg-delete-sink.cc.

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-buffered-delete-sink.h@56
PS6, Line 56:   /// Writes the buffered records to position delete files.
We could add "after sorting" or "in the correct order".


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@72
PS5, Line 72:   /// Nested iterator class to conveniently iterate over a 
FilePositions object.
> Here this would need another iteration over the map (though only in debug m
I feel that it is cleaner to check it here - it is a precondition of this class 
that there should be no empty vectors.

An alternative to the extra loop would be to check that the vector is not empty 
whenever we call file_level_it_->second.begin(), i.e. on L75 and L103. We could 
either add DCHECK(!file_level_it_.second.empty()) before these calls or 
DCHECK(pos_level_it != file_level_it_.second.end()) afterwards.

Or, even simpler, we could add DCHECK(pos_level_it != 
file_level_it_.second.end()) before L84 in Next(), i.e. before dereferencing 
'pos_level_it_', with an error message about the emty vector.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h
File be/src/exec/iceberg-delete-sink-base.h:

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@30
PS4, Line 30: IcebergDeleteSinkBase
> I'm not sure what do you mean by that. It is currently an abstract class as
Right, I missed it.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc
File be/src/exec/iceberg-delete-sink-base.cc:

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@66
PS4, Line 66: TIcebergPartitionTransformType::type transform_type, const 
std::string& value,
> I don't really find the comment misleading. Subclasses need to check it and
Right, I somehow assumed that superclass methods are automatically called like 
in the case of destructors.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@112
PS4, Line 112: // non void partition names and transforms.
> Yeah, currently it is checked twice when the first overload is being used,
Ok, it's indeed a cheap check.


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h
File be/src/exec/iceberg-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h@66
PS5, Line 66:   /// having the same filepath + position), because that would 
corrupt the table data
> That case is handled in IcebergUpdateImpl.java, so I added the comment ther
I'd still consider adding a reference to the comment in IcebergUpdateImpl.java, 
for example
  "For a case where deduplication is not possible at the sink level, see the 
comment in IcebergUpdateImpl::buildAndValidateSelectExprs()".


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc
File be/src/exec/iceberg-delete-sink.cc:

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@79
PS5, Line 79: VerifyRowsNotDuplicated
Can we add DCHECKs for (prev_file_path_, filepath) and (prev_position_, 
position) to check that the data is already sorted?


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@121
PS5, Line 121:   RETURN_IF_ERROR(ConstructPartitionInfo(row, 
current_partition_.first.get()));
What is the reason for taking this out of InitOutputPartition()?



--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 6
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 14:15:48 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 7:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14669/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 7
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 13:16:20 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Zoltan Borok-Nagy (Code Review)
Zoltan Borok-Nagy has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 7:

(1 comment)

PS7 is a rebase (apart from fixing a long line).

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-delete-sink-base.cc
File be/src/exec/iceberg-delete-sink-base.cc:

http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-delete-sink-base.cc@167
PS6, Line 167: if (i < partition_values_decoded.size() - 1) {
> line too long (91 > 90)
Done



--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 7
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 12:43:57 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Zoltan Borok-Nagy (Code Review)
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#7).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

IcebergBufferedDeleteSink cannot spill to disk, so it can only
run if there's enough memory to store the delete records. Paths
are stored only once, and the int64_t positions are stored in
a vector, so updating 100 Million records per node should require
around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
Spilling could be added later, but currently the need for it is not
too realistic.

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
34 files changed, 1,882 insertions(+), 334 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/7
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 6:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14665/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 6
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 11:55:00 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Zoltan Borok-Nagy (Code Review)
Zoltan Borok-Nagy has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 6:

(21 comments)

Thanks for the comments!

http://gerrit.cloudera.org:8080/#/c/20760/5//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/20760/5//COMMIT_MSG@26
PS5, Line 26: FlushFinal
> Can it spill to disk if the data it has received can't be stored in memory?
It cannot spill, though it's not realistic to have that many delete records 
buffered. I've added a paragraph about it.


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@72
PS5, Line 72:   /// Nested iterator class to conveniently iterate over a 
FilePositions object.
> We could also DCHECK that none of the vectors in the map are empty either.
Here this would need another iteration over the map (though only in debug 
mode). Instead of this I added a DCHECK to VerifyBufferedRecords().


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@91
PS5, Line 91:   /// that SortBufferedRecords() has been called already.
> This condition should always be true, we've just dereferenced 'pos_level_it
Done


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.h@35
PS4, Line 35: /// IcebergBufferedDeleteSink buffers the Iceberg position delete 
records in its
> There could be a comment describing when this class is/should be used, e.g.
Added comment.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@45
PS3, Line 45:
> Does 'partition_descriptor_map_' exist?
Updated the comments.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@48
PS3, Line 48: state, MemTracker* parent_m
> I can't see that it has changed in this file, though it did change in icebe
Yeah, sorry, updated the wrong comments.
Apart from Prepare/Open/Send/FlushFinal/Close I didn't copy legacy comments.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@69
PS3, Line 69:   /// It's necessary to use a std::map so we can get back the 
file paths in order.
> The typedefs could stay here, the class could be forward-declared here and
You're right, I didn't think of that.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.cc
File be/src/exec/iceberg-buffered-delete-sink.cc:

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.cc@345
PS4, Line 345:   int capacity = batch->capacity();
> See comment at iceberg-delete-sink-base.cc:66.
Done


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h
File be/src/exec/iceberg-delete-sink-base.h:

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@30
PS4, Line 30: IcebergDeleteSinkBase
> Could we make this class pure virtual?
I'm not sure what do you mean by that. It is currently an abstract class as it 
doesn't override everything from DataSink(), e.g. Send().


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@63
PS4, Line 63:   TIcebergPartitionTransformType::type transform_type, const 
std::string& value,
> An example would be useful.
Added comment.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@70
PS4, Line 70:
> Does the unbuffered IcebergDeleteSink also need a separate DmlExecState? Is
Moved it to IcebergBufferedDeleteSink.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc
File be/src/exec/iceberg-delete-sink-base.cc:

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@66
PS4, Line 66: TIcebergPartitionTransformType::type transform_type, const 
std::string& value,
> The 'closed_' field is a bit strange, AFAICS it comes from DataSink and the
I don't really find the comment misleading. Subclasses need to check it and set 
it to false either directly or indirectly via SuperClass::Close(state);
I rather not modify that comment in this CR.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@88
PS4, Line 88:   ScalarExprEvaluator* spec_id_eval = 
partition_key_expr_evals_[0];
> Nit: doesn't it fit on the previous line?
Done


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@100
PS4, Line 100: _
> Why is it plural? Maybe it could be 'parti

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Zoltan Borok-Nagy (Code Review)
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#6).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

IcebergBufferedDeleteSink cannot spill to disk, so it can only
run if there's enough memory to store the delete records. Paths
are stored only once, and the int64_t positions are stored in
a vector, so updating 100 Million records per node should require
around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
Spilling could be added later, but currently the need for it is not
too realistic.

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
34 files changed, 1,881 insertions(+), 334 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/6
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 6:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10002/ 
DRY_RUN=true


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 6
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 11:29:47 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-12 Thread Daniel Becker (Code Review)
Daniel Becker has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 5:

(7 comments)

Follow-up for my first partial review.

http://gerrit.cloudera.org:8080/#/c/20760/5//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/20760/5//COMMIT_MSG@26
PS5, Line 26: FlushFinal
Can it spill to disk if the data it has received can't be stored in memory?


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@72
PS5, Line 72:   DCHECK(!file_pos.empty());
We could also DCHECK that none of the vectors in the map are empty either.


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@91
PS5, Line 91:   if (pos_level_it_ != file_level_it_->second.end()) {
This condition should always be true, we've just dereferenced 'pos_level_it_' 
on L84 before calling this function. We should DCHECK it instead.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@45
PS3, Line 45: partition_descriptor_map_
Does 'partition_descriptor_map_' exist?


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@48
PS3, Line 48: to the temporary Hdfs files
> Sorry, this was copy-pasted, updated the comments.
I can't see that it has changed in this file, though it did change in 
iceberg-delete-sink.h. Maybe those changes were intended here?
Also, the other function descriptions that were copied here may contain legacy 
comments?


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@69
PS3, Line 69:   class FilePositionsIterator {
> This way I can keep the typedefs private.
The typedefs could stay here, the class could be forward-declared here and 
defined in the .cc file.


http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h
File be/src/exec/iceberg-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h@66
PS5, Line 66:   /// and the delete files as well.
We could mention the case when the different deletes go to different sinks 
because of the differing new values and that we give an error in this case 
because we can't check it.



--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 5
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Tue, 12 Dec 2023 10:34:01 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-11 Thread Daniel Becker (Code Review)
Daniel Becker has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 5:

(14 comments)

Yet another partial review until be/src/exec/iceberg-delete-sink-base.cc

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.h@35
PS4, Line 35: class IcebergBufferedDeleteSink : public IcebergDeleteSinkBase {
There could be a comment describing when this class is/should be used, e.g. 
what the difference between this and IcebergDeleteSink is.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.cc
File be/src/exec/iceberg-buffered-delete-sink.cc:

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-buffered-delete-sink.cc@345
PS4, Line 345: void IcebergBufferedDeleteSink::Close(RuntimeState* state) {
See comment at iceberg-delete-sink-base.cc:66.
Possibly we could convert it into DCHECK(closed_).


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h
File be/src/exec/iceberg-delete-sink-base.h:

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@30
PS4, Line 30: IcebergDeleteSinkBase
Could we make this class pure virtual?


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@63
PS4, Line 63:   std::string HumanReadablePartitionValue(
An example would be useful.

Could you explain what 'transform_result' is? How should it be interpreted and 
how should the return value be interpreted if 'transform_status' is an error?


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.h@70
PS4, Line 70:   DmlExecState dml_exec_state_;
Does the unbuffered IcebergDeleteSink also need a separate DmlExecState? Is 
that used for updates too? Or are updates always handled by the buffered 
IcebergBufferedDeleteSink?


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc
File be/src/exec/iceberg-delete-sink-base.cc:

http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@66
PS4, Line 66:   if (closed_) return;
The 'closed_' field is a bit strange, AFAICS it comes from DataSink and the 
comment there writes that "subclasses should check and set this in Close()." 
But in DataSink::Close() 'closed_' is set to true.

Then some subclasses like TableSinkBase and this one don't set it while others 
like IcebergBufferedDeleteSink do.

I think either the comment at 'DataSink::closed_' should be updated (if 
DataSink is responsible for setting 'closed_') OR we should have all concrete 
subclasses set it (and make intermediate classes that are never instantiated 
into pure virtual classes).


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@88
PS4, Line 88: OutputPartition* output_partition) {
Nit: doesn't it fit on the previous line?


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@100
PS4, Line 100: s
Why is it plural? Maybe it could be 'partition_values_eval', like the argument 
on L111.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@104
PS4, Line 104: s
See L100.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@112
PS4, Line 112:   if (partition_key_expr_evals_.empty()) {
Optional: if this function is called from the other ConstructPartitionInfo() 
overload, this check has already been done. We could extract the rest of this 
function into a helper function that is called by both "top level" functions.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@113
PS4, Line 113: output_partition->iceberg_spec_id = 
table_desc_->IcebergSpecId();
Is there a valid use case for calling this function (and providing a 'spec_id') 
when 'partition_key_expr_evals_' is empty? Can 'spec_id' be different from 
'table_desc_->IcebergSpecId()' If not, we should add a DCHECK; it they can be 
different, it's a bit unintuitive.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@159
PS4, Line 159:   stringstream external_partition_name_ss;
Is this ever written?


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@164
PS4, Line 164: encoded
It could be 'url_encoded_...' - "encoded" is ambiguous, it could also refer to 
Base64 encoding in this context.


http://gerrit.cloudera.org:8080/#/c/20760/4/be/src/exec/iceberg-delete-sink-base.cc@169
PS4, Line 169: string&
Is it intentional that we overwrite the elements of 'partition_values_decoded' 
on L171? I think it's a bit less clean that having an independent variable 
here, though it may be faster.




[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-11 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 5:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14643/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 5
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 
Gerrit-Reviewer: Zoltan Borok-Nagy 
Gerrit-Comment-Date: Mon, 11 Dec 2023 16:07:07 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-11 Thread Zoltan Borok-Nagy (Code Review)
Zoltan Borok-Nagy has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 5:

(26 comments)

Thanks for the comments!

http://gerrit.cloudera.org:8080/#/c/20760/4//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/20760/4//COMMIT_MSG@9
PS4, Line 9: most
> nit: I would say "mainly" or "most importantly"
Done


http://gerrit.cloudera.org:8080/#/c/20760/4//COMMIT_MSG@23
PS4, Line 23: patch int
> nit: just patch
Done


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@48
PS3, Line 48: to the temporary Hdfs files
> It can be misunderstood, one may thing that we write _into_ Hdfs files, lik
Sorry, this was copy-pasted, updated the comments.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@69
PS3, Line 69:   class FilePositionsIterator {
> Can this be put into the .cc file?
This way I can keep the typedefs private.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@74
PS3, Line 74:   file_level_end_ = file_pos.end();
> Shouldn't we check whether file_pos is empty?
Added DCHECK that file_pos is not empty. We should only invoke this if we have 
positions.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@78
PS3, Line 78: bool HasNext() { return file_level_it_ != file_level_end_; }
> We should check for emptiness.
At this point 'file_level_it_' should always point to a valid entry. I'll merge 
Get() and Next(), so the semantics will be clearer.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@83
PS3, Line 83:   StringValue filepath = file_level_it_->first;
> Does HasNext() mean we can call Get() or that there is a valid item after t
I merged the Get() and Next(), so the semantics are clearer.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@96
PS3, Line 96:   }
> A comment on Next() could indicate that Next() shouldn't be called when ite
Done


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@98
PS3, Line 98:
> After incrementing 'file_level_it_' on the previous line, it may now point
Done


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@121
PS3, Line 121:   /// Sorts the buffered records.
> We could include in the comment that it should be called after SortBuffered
Done


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@151
PS3, Line 151:
> Nit: serve?
Done


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc
File be/src/exec/iceberg-buffered-delete-sink.cc:

http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@124
PS3, Line 124: DCHECK_NOTN
> We could DCHECK that filepath_sv is not NULL.
Done


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@149
PS3, Line 149: D_TI
> Optional: to ease understanding, we could use actual types instead of auto
In the for-loop we have std::pair<..., ...> entries, that might necessarily 
improve readability that much, I tried to use self-explanatory names.
In the bodies where it really matters I changed the types.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@161
PS3, Line 161: gstr
> See L149.
I kept entry as is, but introduced a new variable 'PartitionInfo& part_info' in 
the body.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@184
PS3, Line 184:
> See L149.
See above.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@196
PS3, Line 196: v_pos);
> We could use 'file' for 'file_and_pos.first' here and with the Len() too.
Done


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@218
PS3, Line 218: i
> Isn't it only one partition? Maybe 'partition_encoded' would be more unders
Done


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@273
PS3, Line 273:   // With partition evolution it's possible that we have the 
same partition names
> How is this comment connected to the next line? Could you explain?
Added additional sentence.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@288
PS3, Line 288:
> Shouldn't it be '*buffer'?
Good catch, thanks.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@320
PS3, Line 320:
> Just curious, can filepath_ref->GetTupleIdx() be different from position_re
It can't, as it comes from

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-11 Thread Zoltan Borok-Nagy (Code Review)
Hello Tamas Mate, Daniel Becker, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#5).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
34 files changed, 1,882 insertions(+), 334 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/5
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 5
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Tamas Mate 


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-08 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 4:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14625/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 4
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Daniel Becker 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Comment-Date: Fri, 08 Dec 2023 17:18:10 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-08 Thread Daniel Becker (Code Review)
Daniel Becker has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 3:

(19 comments)

Thanks Zoltán, great patch! So far I've only been able to review it up until 
iceberg-buffered-delete-sink.cc.

http://gerrit.cloudera.org:8080/#/c/20760/2//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/20760/2//COMMIT_MSG@26
PS2, Line 26: FlushFinal
Can it spill to disk if the data it has received can't be stored in memory?


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h
File be/src/exec/iceberg-buffered-delete-sink.h:

http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@48
PS3, Line 48: to the temporary Hdfs files
It can be misunderstood, one may thing that we write _into_ Hdfs files, like 
spilling to disk.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@69
PS3, Line 69:   class FilePositionsIterator {
Can this be put into the .cc file?


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@74
PS3, Line 74:   pos_level_it_ = file_level_it_->second.begin();
Shouldn't we check whether file_pos is empty?


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@78
PS3, Line 78:   StringValue filepath = file_level_it_->first;
We should check for emptiness.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@83
PS3, Line 83: bool HasNext() { return file_level_it_ != file_level_end_; }
Does HasNext() mean we can call Get() or that there is a valid item after the 
current one?

In the first case is misunderstandable.

If it is we have a valid item after the last one I think there's an off-by-one 
error: when we're currently at the last file position of the last file, both 
'file_level_it_' and 'pos_level_it_' point to valid fields (HasNext() returns 
true) but there is no valid next element.

See also L98. The semantics of Get(), Next() and HasNext() could be clarified. 
We could also eliminate Get() and have Next() return the next value.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@96
PS3, Line 96:   DCHECK(file_level_it_ != file_level_end_);
A comment on Next() could indicate that Next() shouldn't be called when 
iteration is over.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@98
PS3, Line 98: file_level_it_
After incrementing 'file_level_it_' on the previous line, it may now point 
past-the-end, so we can't dereference it. See also L83.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@121
PS3, Line 121:   /// Verifies that there are no duplicates in the buffered 
delete records.
We could include in the comment that it should be called after 
SortBufferedRecords().


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.h@151
PS3, Line 151: server
Nit: serve?


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc
File be/src/exec/iceberg-buffered-delete-sink.cc:

http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@124
PS3, Line 124: filepath_sv
We could DCHECK that filepath_sv is not NULL.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@149
PS3, Line 149: auto
Optional: to ease understanding, we could use actual types instead of auto in 
these loops.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@161
PS3, Line 161: auto
See L149.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@184
PS3, Line 184: auto
See L149.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@196
PS3, Line 196: file_and_pos.first
We could use 'file' for 'file_and_pos.first' here and with the Len() too.


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@218
PS3, Line 218: s
Isn't it only one partition? Maybe 'partition_encoded' would be more 
understandable?


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@273
PS3, Line 273:   // we might need to delete rows from partition 
"col_trunc=1000" with both spec ids.
How is this comment connected to the next line? Could you explain?


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@288
PS3, Line 288: buffer
Shouldn't it be '*buffer'?


http://gerrit.cloudera.org:8080/#/c/20760/3/be/src/exec/iceberg-buffered-delete-sink.cc@320
PS3, Line 320:   int64_t* pos_slot = 
row->GetTuple(position_ref->GetTupleIdx())->
Just curious, can filepath_ref->GetTupleIdx() be different from 
posit

[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-08 Thread Zoltan Borok-Nagy (Code Review)
Hello Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#4).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, i.e. it could not update Iceberg
tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch set introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

Testing:
 * planner tests
 * e2e tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M testdata/datasets/functional/functional_schema_template.sql
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
M tests/query_test/test_iceberg.py
M tests/stress/test_update_stress.py
34 files changed, 1,869 insertions(+), 327 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/4
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 4
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Impala Public Jenkins 


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-07 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 3:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14614/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 3
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Comment-Date: Thu, 07 Dec 2023 15:36:21 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-07 Thread Zoltan Borok-Nagy (Code Review)
Hello Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#3).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, i.e. it could not update Iceberg
tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch set introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

TODO:
 * planner tests
 * e2e tests
 * concurrent tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
29 files changed, 1,168 insertions(+), 323 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/3
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 3
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Impala Public Jenkins 


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-06 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 2:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14601/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 2
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Comment-Date: Wed, 06 Dec 2023 18:05:03 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-06 Thread Zoltan Borok-Nagy (Code Review)
Hello Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20760

to look at the new patch set (#2).

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, i.e. it could not update Iceberg
tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch set introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.

TODO:
 * planner tests
 * e2e tests
 * concurrent tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
26 files changed, 1,122 insertions(+), 251 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/2
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 2
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Impala Public Jenkins 


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-06 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20760 )

Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..


Patch Set 1:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14599/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 1
Gerrit-Owner: Zoltan Borok-Nagy 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Comment-Date: Wed, 06 Dec 2023 15:05:03 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

2023-12-06 Thread Zoltan Borok-Nagy (Code Review)
Zoltan Borok-Nagy has uploaded this change for review. ( 
http://gerrit.cloudera.org:8080/20760


Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
..

IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

Part 2 had some limitations, i.e. it could not update Iceberg
tables if any of the following were true:
 * UPDATE value of partitioning column
 * UPDATE table that went through partition evolution
 * Table has SORT BY properties

The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.

The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).

To address the above problems, this patch set introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251

Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.

There's only one case we don't allow the UPDATE statement:
 * UPDATE partition column AND
 * Right-hand side of assignment is non-constant expression AND
 * UPDATE statement has a JOIN

When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.

If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.

TODO:
 * planner tests
 * e2e tests
 * concurrent tests
 * Impala/Hive interop tests

Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
---
M be/src/exec/CMakeLists.txt
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
A be/src/exec/iceberg-buffered-delete-sink.cc
A be/src/exec/iceberg-buffered-delete-sink.h
A be/src/exec/iceberg-delete-sink-base.cc
A be/src/exec/iceberg-delete-sink-base.h
A be/src/exec/iceberg-delete-sink-config.cc
A be/src/exec/iceberg-delete-sink-config.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/exprs/slot-ref.h
M be/src/runtime/dml-exec-state.h
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
A fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
26 files changed, 1,123 insertions(+), 251 deletions(-)



  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/60/20760/1
--
To view, visit http://gerrit.cloudera.org:8080/20760
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Gerrit-Change-Number: 20760
Gerrit-PatchSet: 1
Gerrit-Owner: Zoltan Borok-Nagy