openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r700913685



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -321,7 +339,27 @@ private String operatorName(String suffix) {
           equalityFieldIds.add(field.fieldId());
         }
       }
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, 
flinkRowType, equalityFieldIds);
+
+      // Fallback to use upsert mode parsed from table properties if don't 
specify in job level.
+      boolean upsertMode = upsert || 
PropertyUtil.propertyAsBoolean(table.properties(),
+          UPSERT_MODE_ENABLED, UPSERT_MODE_ENABLED_DEFAULT);
+
+      // Validate the equality fields and partition fields if we enable the 
upsert mode.
+      if (upsertMode) {
+        Preconditions.checkState(!overwrite,
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT 
data stream.");
+        Preconditions.checkState(!equalityFieldIds.isEmpty(),
+            "Equality field columns shouldn't be empty when configuring to use 
UPSERT data stream.");
+        if (!table.spec().isUnpartitioned()) {

Review comment:
       If we have a table with `user_id` and `hour`,  the business primary key 
is `user_id`, which mean the table should have at most one row for each given 
`user_id`.  Now let's take about the partition strategy. 
   
   If we just partition the table by `hour` field,  that means two different 
hour partitions may have the same `user_id`, because people may insert the 
`user_id` in `hour=01` and `hour=02`.   If we wanna to keep the primary key 
semantics, then we will need to delete the old user_id in the `hour=01` first, 
then insert the new `user_id` in the `hour=02`.   But when an INSERT come,  we 
don't know which partition has the specific user_id, then we have to broadcast 
the DELETE to all the partitions, which is quite inefficient.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to