Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/20760 )
Change subject: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables. ...................................................................... Patch Set 8: (19 comments) Thanks for the comments! http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/iceberg-buffered-delete-sink.h@56 PS6, Line 56: /// Writes the buffered records to position delete files in the correct oreder. > We could add "after sorting" or "in the correct order". Done http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h File be/src/exec/iceberg-buffered-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-buffered-delete-sink.h@72 PS5, Line 72: /// Nested iterator class to conveniently iterate over a FilePositions object. > I feel that it is cleaner to check it here - it is a precondition of this c Added DCHECK(!file_level_it_.second.empty()) checks as these are the most straightforward about what we want to ensure. Also added DCHECK(pos_level_it != file_level_it_.second.end()) to check that we haven't messed up anyting in NextPos()/NextFile(). http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h File be/src/exec/iceberg-delete-sink.h: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.h@66 PS5, Line 66: /// having the same filepath + position), because that would corrupt the table data > I'd still consider adding a reference to the comment in IcebergUpdateImpl.j Added reference http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc File be/src/exec/iceberg-delete-sink.cc: http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@79 PS5, Line 79: VerifyRowsNotDuplicated > Can we add DCHECKs for (prev_file_path_, filepath) and (prev_position_, pos file paths and positions are not sorted across partitions. So we would need to reset them whenever we create new partitions. Planner tests check the presence of the SORT node and sort expressions. I rather not modify code that is working now, also, I think we might retire IcebergDeleteSink anyway and always use IcebergBufferedDeleteWriter because I feel it should be much faster (need to run some measurements before), plus we wouldn't need to maintain two classes for the same functionality. We would lose spilling capabilities, but that might not needed anyway. http://gerrit.cloudera.org:8080/#/c/20760/5/be/src/exec/iceberg-delete-sink.cc@121 PS5, Line 121: RETURN_IF_ERROR(ConstructPartitionInfo(row, current_partition_.first.get())); > What is the reason for taking this out of InitOutputPartition()? This method has different signatures in the child classes (IcebergBufferedDeleteSink), so we cannot do a virtual call in InitOutputPartition() anymore. Child classes are now responsible creating the current partition object and pass it to InitOutputPartition. http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.h File be/src/exec/table-sink-base.h: http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.h@90 PS6, Line 90: The caller of this function must already > We don't have 'row' anymore. We could add that the partition key is now sup Updated the comment. http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.cc File be/src/exec/table-sink-base.cc: http://gerrit.cloudera.org:8080/#/c/20760/6/be/src/exec/table-sink-base.cc@365 PS6, Line 365: OutputPartition > Is there a reason for using separate OutputPartition and vector parameters The vector only makes sense for the clustered writers where the vector denotes the values that belong to the current partition in the row batch. In IcebergBufferedDeleteSink the vector is not needed as the created row batches always have rows of a single partition. I felt that PartitionPair* is a bit awkward data structure, and it was a bit strange that WriteRowsToPartition() modifies its vector. http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@115 PS6, Line 115: If there are duplicates in the JOIN operator > Strictly speaking, we cannot check for duplicates even if there are none, s I'm not sure what is the point here. Duplicates are only possible in the context of JOINs. Otherwise a source statement like SELECT * FROM target_table WHERE <condition>; should never duplicate rows AFAICT. But I happily rephrase the comment if you have a suggestion. http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@126 PS6, Line 126: via UPDATE FROM > 'modifyStmt_.fromClause_.size() > 1' suggests if there is only one table (o There will be always at least one tableRef because of the target table. http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java File fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java@101 PS6, Line 101: public TSortingOrder getSortingOrder() { > Wouldn't it be better to not have 'getSortingOrder()' in DmlStatementBase b There's good chance we will need it later, e.g. optimizing a table that has SORT BY properties. http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java File fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java: http://gerrit.cloudera.org:8080/#/c/20760/6/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java@34 PS6, Line 34: TableSink > Would it make sense to have an IcebergDeleteSinkBase in the Java code too? It may have some value now, as there are some common fields/methods, but I'm thinking of removing IcebergDeleteSink in the future, so it may not be worth the effort unless you feel strongly about it. In the backend there was much more common code between the two classes, so I had to introduce a base class. http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java File fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java: http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java@76 PS7, Line 76: ByteUnits.MEG > We could use ByteUnits.MEGABYTE, but see comment at IcebergBufferedDeleteSi Done http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: http://gerrit.cloudera.org:8080/#/c/20760/7/fe/src/main/java/org/apache/impala/planner/Planner.java@926 PS7, Line 926: par > 'partitionKeyExprs'? Done http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql File testdata/datasets/functional/functional_schema_template.sql: http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql@3407 PS7, Line 3407: E TA > Shouldn't this be an INSERT OVERWRITE TABLE, like for example on L3469? If Makes sense, I never really thought about this as I usually re-load my tables via "./bin/load-data.py -f ...", and that command really re-creates them. But unfotunately we cannot use INSERT OVERWRITE here because the table has BUCKET partitioning and in that case we forbid INSERT OVERWRITE. But we can do TRUNCATE then INSERT INTO. We have a couple of cases where we could switch to INSERT OVERWITE / TRUNCATE+INSERT INTO, but I think it would be better to be done in a separate patch. http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/datasets/functional/functional_schema_template.sql@3407 PS7, Line 3407: x}.{table_name}; > Shouldn't this be '{table_name}'? Done http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test File testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test: http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test@340 PS7, Line 340: from iceberg_partition_transforms_zorder ice_zorder, iceberg_partitioned source > Just curious: can we also use INNER JOIN syntax here? Yes, and plans should be the same. Added a test with it too. http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test File testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test: http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test@252 PS7, Line 252: FROM clause > Is it important that it is a FROM clause with multiple tables? I think yes, otherwise you cannot have a join that produces duplicates. Standard UPDATE statement only has the following clauses: UPDATE table_name SET column1 = value1, column2 = value2, ... WHERE condition; Though in the condition you might be able to write scalar subqueries, but they don't produce duplicates. http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test File testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test: http://gerrit.cloudera.org:8080/#/c/20760/7/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test@1 PS7, Line 1: ==== > Could also add a table with "sort by". I already had one in the stress test. But I also added SORT BY to some tables here. http://gerrit.cloudera.org:8080/#/c/20760/7/tests/stress/test_update_stress.py File tests/stress/test_update_stress.py: http://gerrit.cloudera.org:8080/#/c/20760/7/tests/stress/test_update_stress.py@65 PS7, Line 65: > Nit: this could be included in the literal of 'stmt'. Done -- To view, visit http://gerrit.cloudera.org:8080/20760 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315 Gerrit-Change-Number: 20760 Gerrit-PatchSet: 8 Gerrit-Owner: Zoltan Borok-Nagy <borokna...@cloudera.com> Gerrit-Reviewer: Daniel Becker <daniel.bec...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Tamas Mate <tma...@apache.org> Gerrit-Reviewer: Zoltan Borok-Nagy <borokna...@cloudera.com> Gerrit-Comment-Date: Tue, 12 Dec 2023 19:00:59 +0000 Gerrit-HasComments: Yes