openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r700913685
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -321,7 +339,27 @@ private String operatorName(String suffix) {
equalityFieldIds.add(field.fieldId());
}
}
- IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table,
flinkRowType, equalityFieldIds);
+
+ // Fallback to use upsert mode parsed from table properties if don't
specify in job level.
+ boolean upsertMode = upsert ||
PropertyUtil.propertyAsBoolean(table.properties(),
+ UPSERT_MODE_ENABLED, UPSERT_MODE_ENABLED_DEFAULT);
+
+ // Validate the equality fields and partition fields if we enable the
upsert mode.
+ if (upsertMode) {
+ Preconditions.checkState(!overwrite,
+ "OVERWRITE mode shouldn't be enable when configuring to use UPSERT
data stream.");
+ Preconditions.checkState(!equalityFieldIds.isEmpty(),
+ "Equality field columns shouldn't be empty when configuring to use
UPSERT data stream.");
+ if (!table.spec().isUnpartitioned()) {
Review comment:
If we have a table with `user_id` and `hour`, the business primary key
is `user_id`, which mean the table should have at most one row for each given
`user_id`. Now let's take about the partition strategy.
If we just partition the table by `hour` field, that means two different
hour partitions may have the same `user_id`, because people may insert the
`user_id` in `hour=01` and `hour=02`. If we wanna to keep the primary key
semantics, then we will need to delete the old user_id in the `hour=01` first,
then insert the new `user_id` in the `hour=02`. But when an INSERT come, we
don't know which partition has the specific user_id, then we have to broadcast
the DELETE to all the partitions, which is quite inefficient.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]