kbendick commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r704767022



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
##########
@@ -70,11 +73,19 @@ public void write(RowData row) throws IOException {
     switch (row.getRowKind()) {
       case INSERT:
       case UPDATE_AFTER:
+        if (upsert) {
+          writer.delete(row);
+        }
         writer.write(row);
         break;
 
-      case DELETE:
       case UPDATE_BEFORE:
+        if (upsert) {
+          break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing 
to prevent delete one row twice
+        }

Review comment:
       Non-blocking question: 
   
   Are there possible concerns with events coming out of order for some reason? 
I guess since the table commits are serializable, this isn't a concern as the 
same row for these equality fields shouldn't be updated twice in the same 
commit?

##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -222,4 +222,7 @@ private TableProperties() {
 
   public static final String MERGE_CARDINALITY_CHECK_ENABLED = 
"write.merge.cardinality-check.enabled";
   public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;
+
+  public static final String UPSERT_MODE_ENABLE = "write.upsert.enable";
+  public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false;

Review comment:
       Two questions, one that's somewhat unrelated:
   
   1. Is this only used in streaming mode now? Or does this work with Flink 
batch sink as well?
   2. (Somewhat unrelated / thinking out loud) If we have this new 
`write.upsert.enabled` flag, could we possibly use it to add our own support 
for CDC on top of Spark Structured Streaming? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to