This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/0.13.x by this push:
     new f72a15bff Flink: Backport upsert delete file metadata fixes to 0.13 
(#4786)
f72a15bff is described below

commit f72a15bff95e3d3b9c750c996f6590078baea5d6
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Thu May 19 22:20:47 2022 +0200

    Flink: Backport upsert delete file metadata fixes to 0.13 (#4786)
    
    Co-authored-by: Kyle Bendickson <[email protected]>
    Co-authored-by: liliwei <[email protected]>
    Co-authored-by: wangzeyu <[email protected]>
    Co-authored-by: openinx <[email protected]>
---
 .../java/org/apache/iceberg/io/BaseTaskWriter.java |  22 +-
 .../iceberg/io/TestTaskEqualityDeltaWriter.java    |  16 +-
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |   5 +
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    |  10 +-
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |  13 +-
 .../flink/sink/RowDataTaskWriterFactory.java       |   9 +-
 .../org/apache/iceberg/flink/TestFlinkUpsert.java  | 287 +++++++++++++++++++++
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    |  10 +-
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |  13 +-
 .../flink/sink/RowDataTaskWriterFactory.java       |   9 +-
 .../apache/iceberg/flink/TestFlinkTableSink.java   |  43 +--
 .../org/apache/iceberg/flink/TestFlinkUpsert.java  | 287 +++++++++++++++++++++
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    |  10 +-
 .../iceberg/flink/source/BoundedTableFactory.java  |  30 ++-
 .../iceberg/flink/source/BoundedTestSource.java    |  20 +-
 15 files changed, 724 insertions(+), 60 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java 
b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 2d39acb20..bc2570952 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -115,6 +115,11 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
      */
     protected abstract StructLike asStructLike(T data);
 
+    /**
+     * Wrap the passed in key of a row as a {@link StructLike}
+     */
+    protected abstract StructLike asStructLikeKey(T key);
+
     public void write(T row) throws IOException {
       PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), 
dataWriter.currentRows());
 
@@ -136,13 +141,16 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
      *
      * @param key has the same columns with the equality fields.
      */
-    private void internalPosDelete(StructLike key) {
+    private boolean internalPosDelete(StructLike key) {
       PathOffset previous = insertedRowMap.remove(key);
 
       if (previous != null) {
         // TODO attach the previous row if has a positional-delete row schema 
in appender factory.
         posDeleteWriter.delete(previous.path, previous.rowOffset, null);
+        return true;
       }
+
+      return false;
     }
 
     /**
@@ -152,9 +160,9 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
      * @param row the given row to delete.
      */
     public void delete(T row) throws IOException {
-      internalPosDelete(structProjection.wrap(asStructLike(row)));
-
-      eqDeleteWriter.write(row);
+      if (!internalPosDelete(structProjection.wrap(asStructLike(row)))) {
+        eqDeleteWriter.write(row);
+      }
     }
 
     /**
@@ -164,9 +172,9 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
      * @param key is the projected data whose columns are the same as the 
equality fields.
      */
     public void deleteKey(T key) throws IOException {
-      internalPosDelete(asStructLike(key));
-
-      eqDeleteWriter.write(key);
+      if (!internalPosDelete(asStructLikeKey(key))) {
+        eqDeleteWriter.write(key);
+      }
     }
 
     @Override
diff --git 
a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java 
b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
index c59a757eb..82c194436 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
@@ -209,7 +209,7 @@ public class TestTaskEqualityDeltaWriter extends 
TableTestBase {
 
     WriteResult result = deltaWriter.complete();
     Assert.assertEquals("Should have a data file.", 1, 
result.dataFiles().length);
-    Assert.assertEquals("Should have a pos-delete file and an eq-delete file", 
2, result.deleteFiles().length);
+    Assert.assertEquals("Should have a pos-delete file.", 1, 
result.deleteFiles().length);
     commitTransaction(result);
     Assert.assertEquals("Should have an expected record", 
expectedRowSet(ImmutableList.of(record)), actualRowSet("*"));
 
@@ -217,12 +217,8 @@ public class TestTaskEqualityDeltaWriter extends 
TableTestBase {
     DataFile dataFile = result.dataFiles()[0];
     Assert.assertEquals(ImmutableList.of(record, record), 
readRecordsAsList(table.schema(), dataFile.path()));
 
-    // Check records in the eq-delete file.
-    DeleteFile eqDeleteFile = result.deleteFiles()[0];
-    Assert.assertEquals(ImmutableList.of(record), 
readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.path()));
-
     // Check records in the pos-delete file.
-    DeleteFile posDeleteFile = result.deleteFiles()[1];
+    DeleteFile posDeleteFile = result.deleteFiles()[0];
     Assert.assertEquals(ImmutableList.of(
         posRecord.copy("file_path", dataFile.path(), "pos", 0L)
     ), readRecordsAsList(DeleteSchemaUtil.pathPosSchema(), 
posDeleteFile.path()));
@@ -305,7 +301,6 @@ public class TestTaskEqualityDeltaWriter extends 
TableTestBase {
     DeleteFile eqDeleteFile = result.deleteFiles()[0];
     Assert.assertEquals(FileContent.EQUALITY_DELETES, eqDeleteFile.content());
     Assert.assertEquals(ImmutableList.of(
-        keyFunc.apply("aaa"),
         keyFunc.apply("aaa"),
         keyFunc.apply("ccc"),
         keyFunc.apply("bbb")
@@ -389,7 +384,6 @@ public class TestTaskEqualityDeltaWriter extends 
TableTestBase {
     Assert.assertEquals(FileContent.EQUALITY_DELETES, eqDeleteFile.content());
     Assert.assertEquals(ImmutableList.of(
         createRecord(3, "aaa"),
-        createRecord(5, "aaa"),
         createRecord(4, "ccc"),
         createRecord(2, "bbb")
     ), readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.path()));
@@ -467,6 +461,7 @@ public class TestTaskEqualityDeltaWriter extends 
TableTestBase {
       deltaWriter.delete(row);
     }
 
+    // The caller of this function is responsible for passing in a record with 
only the key fields
     public void deleteKey(Record key) throws IOException {
       deltaWriter.deleteKey(key);
     }
@@ -485,6 +480,11 @@ public class TestTaskEqualityDeltaWriter extends 
TableTestBase {
       protected StructLike asStructLike(Record row) {
         return row;
       }
+
+      @Override
+      protected StructLike asStructLikeKey(Record data) {
+        return data;
+      }
     }
   }
 
diff --git 
a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
 
b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129db..f4eeb8d68 100644
--- 
a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ 
b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -103,5 +103,10 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
     protected StructLike asStructLike(RowData data) {
       return wrapper.wrap(data);
     }
+
+    @Override
+    protected StructLike asStructLikeKey(RowData data) {
+      return wrapper.wrap(data);
+    }
   }
 }
diff --git 
a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
 
b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 79009dea7..5a3cdae7e 100644
--- 
a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ 
b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -135,7 +135,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
 
     WriteResult result = writer.complete();
     Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length);
-    Assert.assertEquals(partitioned ? 6 : 2, result.deleteFiles().length);
+    Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length);
     commitTransaction(result);
 
     Assert.assertEquals("Should have expected records.", expectedRowSet(
@@ -305,13 +305,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
     writer.write(createInsert(1, "aaa"));
     writer.write(createInsert(2, "aaa"));
 
-    writer.write(createDelete(2, "aaa")); // 1 pos-delete and 1 eq-delete.
+    writer.write(createDelete(2, "aaa")); // 1 pos-delete.
 
     WriteResult result = writer.complete();
     Assert.assertEquals(1, result.dataFiles().length);
-    Assert.assertEquals(2, result.deleteFiles().length);
-    Assert.assertEquals(Sets.newHashSet(FileContent.EQUALITY_DELETES, 
FileContent.POSITION_DELETES),
-        Sets.newHashSet(result.deleteFiles()[0].content(), 
result.deleteFiles()[1].content()));
+    Assert.assertEquals(1, result.deleteFiles().length);
+    Assert.assertEquals(Sets.newHashSet(FileContent.POSITION_DELETES),
+            Sets.newHashSet(result.deleteFiles()[0].content()));
     commitTransaction(result);
 
     Assert.assertEquals("Should have expected records", expectedRowSet(
diff --git 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
 
b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129db..356b7346d 100644
--- 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ 
b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -28,7 +28,9 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.data.RowDataProjection;
 import org.apache.iceberg.io.BaseTaskWriter;
 import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.FileIO;
@@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
   private final Schema schema;
   private final Schema deleteSchema;
   private final RowDataWrapper wrapper;
+  private final RowDataWrapper keyWrapper;
+  private final RowDataProjection keyProjection;
   private final boolean upsert;
 
   BaseDeltaTaskWriter(PartitionSpec spec,
@@ -58,6 +62,8 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
     this.deleteSchema = TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds));
     this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
     this.upsert = upsert;
+    this.keyWrapper =  new 
RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
+    this.keyProjection = RowDataProjection.create(schema, deleteSchema);
   }
 
   abstract RowDataDeltaWriter route(RowData row);
@@ -74,7 +80,7 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
       case INSERT:
       case UPDATE_AFTER:
         if (upsert) {
-          writer.delete(row);
+          writer.deleteKey(keyProjection.wrap(row));
         }
         writer.write(row);
         break;
@@ -103,5 +109,10 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
     protected StructLike asStructLike(RowData data) {
       return wrapper.wrap(data);
     }
+
+    @Override
+    protected StructLike asStructLikeKey(RowData data) {
+      return keyWrapper.wrap(data);
+    }
   }
 }
diff --git 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
 
b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 284910085..514430502 100644
--- 
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ 
b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -35,6 +35,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ArrayUtil;
 
 public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
@@ -69,8 +71,13 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
 
     if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
       this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, 
table.properties(), spec);
+    } else if (upsert) {
+      // In upsert mode, only the new row is emitted using INSERT row kind. 
Therefore, any column of the inserted row
+      // may differ from the deleted row other than the primary key fields, 
and the delete file must contain values
+      // that are correct for the deleted row. Therefore, only write the 
equality delete fields.
+      this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, 
table.properties(), spec,
+          ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds)), null);
     } else {
-      // TODO provide the ability to customize the equality-delete row schema.
       this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, 
table.properties(), spec,
           ArrayUtil.toIntArray(equalityFieldIds), schema, null);
     }
diff --git 
a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java 
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
new file mode 100644
index 000000000..6ec35e2ff
--- /dev/null
+++ 
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkUpsert extends FlinkCatalogTestBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final boolean isStreamingJob;
+  private final Map<String, String> tableUpsertProps = Maps.newHashMap();
+  private TableEnvironment tEnv;
+
+  public TestFlinkUpsert(String catalogName, Namespace baseNamespace, 
FileFormat format, Boolean isStreamingJob) {
+    super(catalogName, baseNamespace);
+    this.isStreamingJob = isStreamingJob;
+    tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+    tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
+    tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+  }
+
+  @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, 
format={2}, isStreaming={3}")
+  public static Iterable<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, 
FileFormat.AVRO, FileFormat.ORC}) {
+      for (Boolean isStreaming : new Boolean[] {true, false}) {
+        // Only test with one catalog as this is a file operation concern.
+        // FlinkCatalogTestBase requires the catalog name start with 
testhadoop if using hadoop catalog.
+        String catalogName = "testhadoop";
+        Namespace baseNamespace = Namespace.of("default");
+        parameters.add(new Object[] {catalogName, baseNamespace, format, 
isStreaming});
+      }
+    }
+    return parameters;
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
+            .newInstance();
+        if (isStreamingJob) {
+          settingsBuilder.inStreamingMode();
+          StreamExecutionEnvironment env = StreamExecutionEnvironment
+              
.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+          env.enableCheckpointing(400);
+          env.setMaxParallelism(2);
+          env.setParallelism(2);
+          tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+        } else {
+          settingsBuilder.inBatchMode();
+          tEnv = TableEnvironment.create(settingsBuilder.build());
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  @Override
+  @Before
+  public void before() {
+    super.before();
+    sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+    sql("USE CATALOG %s", catalogName);
+    sql("USE %s", DATABASE);
+  }
+
+  @Override
+  @After
+  public void clean() {
+    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    super.clean();
+  }
+
+  @Test
+  public void testUpsertAndQuery() {
+    String tableName = "test_upsert_query";
+    LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
+    LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
+
+    sql("CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, 
PRIMARY KEY(id,province) NOT ENFORCED) " +
+            "PARTITIONED BY (province) WITH %s",
+        tableName, toWithClause(tableUpsertProps));
+
+    try {
+      sql("INSERT INTO %s VALUES " +
+          "(1, 'a', TO_DATE('2022-03-01'))," +
+          "(2, 'b', TO_DATE('2022-03-01'))," +
+          "(1, 'b', TO_DATE('2022-03-01'))",
+          tableName);
+
+      sql("INSERT INTO %s VALUES " +
+          "(4, 'a', TO_DATE('2022-03-02'))," +
+          "(5, 'b', TO_DATE('2022-03-02'))," +
+          "(1, 'b', TO_DATE('2022-03-02'))",
+          tableName);
+
+      List<Row> rowsOn20220301 = Lists.newArrayList(Row.of(2, "b", 
dt20220301), Row.of(1, "a", dt20220301));
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName),
+          rowsOn20220301);
+
+      List<Row> rowsOn20220302 = Lists.newArrayList(
+          Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, 
"b", dt20220302));
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName),
+          rowsOn20220302);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Iterables.concat(rowsOn20220301, 
rowsOn20220302)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @Test
+  public void testPrimaryKeyEqualToPartitionKey() {
+    // This is an SQL based reproduction of 
TestFlinkIcebergSinkV2#testUpsertOnDataKey
+    String tableName = "upsert_on_data_key";
+    try {
+      sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY 
KEY(data) NOT ENFORCED) " +
+              "PARTITIONED BY (data) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql("INSERT INTO %s VALUES " +
+          "(1, 'aaa')," +
+          "(2, 'aaa')," +
+          "(3, 'bbb')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));
+
+      sql("INSERT INTO %s VALUES " +
+          "(4, 'aaa')," +
+          "(5, 'bbb')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));
+
+      sql("INSERT INTO %s VALUES " +
+          "(6, 'aaa')," +
+          "(7, 'bbb')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @Test
+  public void testPrimaryKeyFieldsAtBeginningOfSchema() {
+    String tableName = "upsert_on_pk_at_schema_start";
+    LocalDate dt = LocalDate.of(2022, 3, 1);
+    try {
+      sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, 
PRIMARY KEY(data,dt) NOT ENFORCED) " +
+              "PARTITIONED BY (data) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql("INSERT INTO %s VALUES " +
+          "('aaa', TO_DATE('2022-03-01'), 1)," +
+          "('aaa', TO_DATE('2022-03-01'), 2)," +
+          "('bbb', TO_DATE('2022-03-01'), 3)",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));
+
+      sql("INSERT INTO %s VALUES " +
+          "('aaa', TO_DATE('2022-03-01'), 4)," +
+          "('bbb', TO_DATE('2022-03-01'), 5)",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));
+
+      sql("INSERT INTO %s VALUES " +
+          "('aaa', TO_DATE('2022-03-01'), 6)," +
+          "('bbb', TO_DATE('2022-03-01'), 7)",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @Test
+  public void testPrimaryKeyFieldsAtEndOfTableSchema() {
+    // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, 
but the primary key fields
+    // are located at the end of the flink schema.
+    String tableName = "upsert_on_pk_at_schema_end";
+    LocalDate dt = LocalDate.of(2022, 3, 1);
+    try {
+      sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, 
PRIMARY KEY(data,dt) NOT ENFORCED) " +
+              "PARTITIONED BY (data) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql("INSERT INTO %s VALUES " +
+          "(1, 'aaa', TO_DATE('2022-03-01'))," +
+          "(2, 'aaa', TO_DATE('2022-03-01'))," +
+          "(3, 'bbb', TO_DATE('2022-03-01'))",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));
+
+      sql("INSERT INTO %s VALUES " +
+          "(4, 'aaa', TO_DATE('2022-03-01'))," +
+          "(5, 'bbb', TO_DATE('2022-03-01'))",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));
+
+      sql("INSERT INTO %s VALUES " +
+          "(6, 'aaa', TO_DATE('2022-03-01'))," +
+          "(7, 'bbb', TO_DATE('2022-03-01'))",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+}
diff --git 
a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
 
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 280e14807..34b7f0367 100644
--- 
a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ 
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -136,7 +136,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
 
     WriteResult result = writer.complete();
     Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length);
-    Assert.assertEquals(partitioned ? 6 : 2, result.deleteFiles().length);
+    Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length);
     commitTransaction(result);
 
     Assert.assertEquals("Should have expected records.", expectedRowSet(
@@ -306,13 +306,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
     writer.write(createInsert(1, "aaa"));
     writer.write(createInsert(2, "aaa"));
 
-    writer.write(createDelete(2, "aaa")); // 1 pos-delete and 1 eq-delete.
+    writer.write(createDelete(2, "aaa")); // 1 pos-delete.
 
     WriteResult result = writer.complete();
     Assert.assertEquals(1, result.dataFiles().length);
-    Assert.assertEquals(2, result.deleteFiles().length);
-    Assert.assertEquals(Sets.newHashSet(FileContent.EQUALITY_DELETES, 
FileContent.POSITION_DELETES),
-        Sets.newHashSet(result.deleteFiles()[0].content(), 
result.deleteFiles()[1].content()));
+    Assert.assertEquals(1, result.deleteFiles().length);
+    Assert.assertEquals(Sets.newHashSet(FileContent.POSITION_DELETES),
+            Sets.newHashSet(result.deleteFiles()[0].content()));
     commitTransaction(result);
 
     Assert.assertEquals("Should have expected records", expectedRowSet(
diff --git 
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129db..16262b22e 100644
--- 
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -28,7 +28,9 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.data.RowDataProjection;
 import org.apache.iceberg.io.BaseTaskWriter;
 import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.FileIO;
@@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
   private final Schema schema;
   private final Schema deleteSchema;
   private final RowDataWrapper wrapper;
+  private final RowDataWrapper keyWrapper;
+  private final RowDataProjection keyProjection;
   private final boolean upsert;
 
   BaseDeltaTaskWriter(PartitionSpec spec,
@@ -57,6 +61,8 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
     this.schema = schema;
     this.deleteSchema = TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds));
     this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+    this.keyWrapper =  new 
RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
+    this.keyProjection = RowDataProjection.create(schema, deleteSchema);
     this.upsert = upsert;
   }
 
@@ -74,7 +80,7 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
       case INSERT:
       case UPDATE_AFTER:
         if (upsert) {
-          writer.delete(row);
+          writer.deleteKey(keyProjection.wrap(row));
         }
         writer.write(row);
         break;
@@ -103,5 +109,10 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
     protected StructLike asStructLike(RowData data) {
       return wrapper.wrap(data);
     }
+
+    @Override
+    protected StructLike asStructLikeKey(RowData data) {
+      return keyWrapper.wrap(data);
+    }
   }
 }
diff --git 
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 284910085..514430502 100644
--- 
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -35,6 +35,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ArrayUtil;
 
 public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
@@ -69,8 +71,13 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
 
     if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
       this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, 
table.properties(), spec);
+    } else if (upsert) {
+      // In upsert mode, only the new row is emitted using INSERT row kind. 
Therefore, any column of the inserted row
+      // may differ from the deleted row other than the primary key fields, 
and the delete file must contain values
+      // that are correct for the deleted row. Therefore, only write the 
equality delete fields.
+      this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, 
table.properties(), spec,
+          ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds)), null);
     } else {
-      // TODO provide the ability to customize the equality-delete row schema.
       this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, 
table.properties(), spec,
           ArrayUtil.toIntArray(equalityFieldIds), schema, null);
     }
diff --git 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index 9cce4a043..0c30b0916 100644
--- 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -21,6 +21,8 @@ package org.apache.iceberg.flink;
 
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -28,6 +30,7 @@ import org.apache.flink.table.api.Expressions;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
@@ -35,9 +38,11 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.BoundedTableFactory;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -58,6 +63,7 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
   @ClassRule
   public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
+  private static final String SOURCE_TABLE = 
"default_catalog.default_database.bounded_source";
   private static final String TABLE_NAME = "test_table";
   private TableEnvironment tEnv;
   private Table icebergTable;
@@ -127,6 +133,7 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
     sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    BoundedTableFactory.clearDataSets();
     super.clean();
   }
 
@@ -253,33 +260,37 @@ public class TestFlinkTableSink extends 
FlinkCatalogTestBase {
         "write.format.default", format.name(),
         TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.HASH.modeName()
     );
+
+    // Initialize a BoundedSource table to precisely emit those rows in only 
one checkpoint.
+    List<Row> dataSet = IntStream.range(1, 1000)
+        .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), 
Row.of(i, "ccc")))
+        .flatMap(List::stream)
+        .collect(Collectors.toList());
+    String dataId = 
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" +
+        " WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, 
dataId);
+    Assert.assertEquals("Should have the expected rows in source table.", 
Sets.newHashSet(dataSet),
+        Sets.newHashSet(sql("SELECT * FROM %s", SOURCE_TABLE)));
+
     sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
         tableName, toWithClause(tableProps));
 
     try {
       // Insert data set.
-      sql("INSERT INTO %s VALUES " +
-          "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
-          "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
-          "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
+      sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
 
-      Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
-      SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
-          SimpleDataUtil.createRecord(1, "aaa"),
-          SimpleDataUtil.createRecord(1, "bbb"),
-          SimpleDataUtil.createRecord(1, "ccc"),
-          SimpleDataUtil.createRecord(2, "aaa"),
-          SimpleDataUtil.createRecord(2, "bbb"),
-          SimpleDataUtil.createRecord(2, "ccc"),
-          SimpleDataUtil.createRecord(3, "aaa"),
-          SimpleDataUtil.createRecord(3, "bbb"),
-          SimpleDataUtil.createRecord(3, "ccc")
-      ));
+      Assert.assertEquals("Should have the expected rows in sink table.", 
Sets.newHashSet(dataSet),
+          Sets.newHashSet(sql("SELECT * FROM %s", tableName)));
 
       // Sometimes we will have more than one checkpoint if we pass the auto 
checkpoint interval,
       // thus producing multiple snapshots.  Here we assert that each snapshot 
has only 1 file per partition.
+      Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
       Map<Long, List<DataFile>> snapshotToDataFiles = 
SimpleDataUtil.snapshotToDataFiles(table);
       for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
+        if (dataFiles.isEmpty()) {
+          continue;
+        }
+
         Assert.assertEquals("There should be 1 data file in partition 'aaa'", 
1,
             SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), 
ImmutableMap.of("data", "aaa")).size());
         Assert.assertEquals("There should be 1 data file in partition 'bbb'", 
1,
diff --git 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
new file mode 100644
index 000000000..6ec35e2ff
--- /dev/null
+++ 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkUpsert extends FlinkCatalogTestBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final boolean isStreamingJob;
+  private final Map<String, String> tableUpsertProps = Maps.newHashMap();
+  private TableEnvironment tEnv;
+
+  public TestFlinkUpsert(String catalogName, Namespace baseNamespace, 
FileFormat format, Boolean isStreamingJob) {
+    super(catalogName, baseNamespace);
+    this.isStreamingJob = isStreamingJob;
+    tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+    tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
+    tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+  }
+
+  @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, 
format={2}, isStreaming={3}")
+  public static Iterable<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    for (FileFormat format : new FileFormat[] {FileFormat.PARQUET, 
FileFormat.AVRO, FileFormat.ORC}) {
+      for (Boolean isStreaming : new Boolean[] {true, false}) {
+        // Only test with one catalog as this is a file operation concern.
+        // FlinkCatalogTestBase requires the catalog name start with 
testhadoop if using hadoop catalog.
+        String catalogName = "testhadoop";
+        Namespace baseNamespace = Namespace.of("default");
+        parameters.add(new Object[] {catalogName, baseNamespace, format, 
isStreaming});
+      }
+    }
+    return parameters;
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
+            .newInstance();
+        if (isStreamingJob) {
+          settingsBuilder.inStreamingMode();
+          StreamExecutionEnvironment env = StreamExecutionEnvironment
+              
.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+          env.enableCheckpointing(400);
+          env.setMaxParallelism(2);
+          env.setParallelism(2);
+          tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+        } else {
+          settingsBuilder.inBatchMode();
+          tEnv = TableEnvironment.create(settingsBuilder.build());
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  @Override
+  @Before
+  public void before() {
+    super.before();
+    sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+    sql("USE CATALOG %s", catalogName);
+    sql("USE %s", DATABASE);
+  }
+
+  @Override
+  @After
+  public void clean() {
+    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    super.clean();
+  }
+
+  @Test
+  public void testUpsertAndQuery() {
+    String tableName = "test_upsert_query";
+    LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
+    LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
+
+    sql("CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE, 
PRIMARY KEY(id,province) NOT ENFORCED) " +
+            "PARTITIONED BY (province) WITH %s",
+        tableName, toWithClause(tableUpsertProps));
+
+    try {
+      sql("INSERT INTO %s VALUES " +
+          "(1, 'a', TO_DATE('2022-03-01'))," +
+          "(2, 'b', TO_DATE('2022-03-01'))," +
+          "(1, 'b', TO_DATE('2022-03-01'))",
+          tableName);
+
+      sql("INSERT INTO %s VALUES " +
+          "(4, 'a', TO_DATE('2022-03-02'))," +
+          "(5, 'b', TO_DATE('2022-03-02'))," +
+          "(1, 'b', TO_DATE('2022-03-02'))",
+          tableName);
+
+      List<Row> rowsOn20220301 = Lists.newArrayList(Row.of(2, "b", 
dt20220301), Row.of(1, "a", dt20220301));
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName),
+          rowsOn20220301);
+
+      List<Row> rowsOn20220302 = Lists.newArrayList(
+          Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5, 
"b", dt20220302));
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName),
+          rowsOn20220302);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Iterables.concat(rowsOn20220301, 
rowsOn20220302)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @Test
+  public void testPrimaryKeyEqualToPartitionKey() {
+    // This is an SQL based reproduction of 
TestFlinkIcebergSinkV2#testUpsertOnDataKey
+    String tableName = "upsert_on_data_key";
+    try {
+      sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY 
KEY(data) NOT ENFORCED) " +
+              "PARTITIONED BY (data) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql("INSERT INTO %s VALUES " +
+          "(1, 'aaa')," +
+          "(2, 'aaa')," +
+          "(3, 'bbb')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));
+
+      sql("INSERT INTO %s VALUES " +
+          "(4, 'aaa')," +
+          "(5, 'bbb')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));
+
+      sql("INSERT INTO %s VALUES " +
+          "(6, 'aaa')," +
+          "(7, 'bbb')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @Test
+  public void testPrimaryKeyFieldsAtBeginningOfSchema() {
+    String tableName = "upsert_on_pk_at_schema_start";
+    LocalDate dt = LocalDate.of(2022, 3, 1);
+    try {
+      sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT, 
PRIMARY KEY(data,dt) NOT ENFORCED) " +
+              "PARTITIONED BY (data) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql("INSERT INTO %s VALUES " +
+          "('aaa', TO_DATE('2022-03-01'), 1)," +
+          "('aaa', TO_DATE('2022-03-01'), 2)," +
+          "('bbb', TO_DATE('2022-03-01'), 3)",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));
+
+      sql("INSERT INTO %s VALUES " +
+          "('aaa', TO_DATE('2022-03-01'), 4)," +
+          "('bbb', TO_DATE('2022-03-01'), 5)",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));
+
+      sql("INSERT INTO %s VALUES " +
+          "('aaa', TO_DATE('2022-03-01'), 6)," +
+          "('bbb', TO_DATE('2022-03-01'), 7)",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @Test
+  public void testPrimaryKeyFieldsAtEndOfTableSchema() {
+    // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, 
but the primary key fields
+    // are located at the end of the flink schema.
+    String tableName = "upsert_on_pk_at_schema_end";
+    LocalDate dt = LocalDate.of(2022, 3, 1);
+    try {
+      sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL, 
PRIMARY KEY(data,dt) NOT ENFORCED) " +
+              "PARTITIONED BY (data) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql("INSERT INTO %s VALUES " +
+          "(1, 'aaa', TO_DATE('2022-03-01'))," +
+          "(2, 'aaa', TO_DATE('2022-03-01'))," +
+          "(3, 'bbb', TO_DATE('2022-03-01'))",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));
+
+      sql("INSERT INTO %s VALUES " +
+          "(4, 'aaa', TO_DATE('2022-03-01'))," +
+          "(5, 'bbb', TO_DATE('2022-03-01'))",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));
+
+      sql("INSERT INTO %s VALUES " +
+          "(6, 'aaa', TO_DATE('2022-03-01'))," +
+          "(7, 'bbb', TO_DATE('2022-03-01'))",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+}
diff --git 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 280e14807..e30412ad8 100644
--- 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -136,7 +136,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
 
     WriteResult result = writer.complete();
     Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length);
-    Assert.assertEquals(partitioned ? 6 : 2, result.deleteFiles().length);
+    Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length);
     commitTransaction(result);
 
     Assert.assertEquals("Should have expected records.", expectedRowSet(
@@ -306,13 +306,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
     writer.write(createInsert(1, "aaa"));
     writer.write(createInsert(2, "aaa"));
 
-    writer.write(createDelete(2, "aaa")); // 1 pos-delete and 1 eq-delete.
+    writer.write(createDelete(2, "aaa")); // 1 pos-delete.
 
     WriteResult result = writer.complete();
     Assert.assertEquals(1, result.dataFiles().length);
-    Assert.assertEquals(2, result.deleteFiles().length);
-    Assert.assertEquals(Sets.newHashSet(FileContent.EQUALITY_DELETES, 
FileContent.POSITION_DELETES),
-        Sets.newHashSet(result.deleteFiles()[0].content(), 
result.deleteFiles()[1].content()));
+    Assert.assertEquals(1, result.deleteFiles().length);
+    Assert.assertEquals(Sets.newHashSet(FileContent.POSITION_DELETES),
+        Sets.newHashSet(result.deleteFiles()[0].content()));
     commitTransaction(result);
 
     Assert.assertEquals("Should have expected records", expectedRowSet(
diff --git 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
index a039be9d5..b0041c3bc 100644
--- 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
+++ 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -107,12 +109,25 @@ public class BoundedTableFactory implements 
DynamicTableSourceFactory {
 
     @Override
     public ChangelogMode getChangelogMode() {
-      return ChangelogMode.newBuilder()
-          .addContainedKind(RowKind.INSERT)
-          .addContainedKind(RowKind.DELETE)
-          .addContainedKind(RowKind.UPDATE_BEFORE)
-          .addContainedKind(RowKind.UPDATE_AFTER)
-          .build();
+      Supplier<Stream<Row>> supplier = () -> 
elementsPerCheckpoint.stream().flatMap(List::stream);
+
+      // Add the INSERT row kind by default.
+      ChangelogMode.Builder builder = ChangelogMode.newBuilder()
+          .addContainedKind(RowKind.INSERT);
+
+      if (supplier.get().anyMatch(r -> r.getKind() == RowKind.DELETE)) {
+        builder.addContainedKind(RowKind.DELETE);
+      }
+
+      if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_BEFORE)) {
+        builder.addContainedKind(RowKind.UPDATE_BEFORE);
+      }
+
+      if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_AFTER)) {
+        builder.addContainedKind(RowKind.UPDATE_AFTER);
+      }
+
+      return builder.build();
     }
 
     @Override
@@ -120,7 +135,8 @@ public class BoundedTableFactory implements 
DynamicTableSourceFactory {
       return new DataStreamScanProvider() {
         @Override
         public DataStream<RowData> 
produceDataStream(StreamExecutionEnvironment env) {
-          SourceFunction<Row> source = new 
BoundedTestSource<>(elementsPerCheckpoint);
+          boolean checkpointEnabled = 
env.getCheckpointConfig().isCheckpointingEnabled();
+          SourceFunction<Row> source = new 
BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled);
 
           RowType rowType = (RowType) 
tableSchema.toRowDataType().getLogicalType();
           // Converter to convert the Row to RowData.
diff --git 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
index 6f6712dea..54e44ee5b 100644
--- 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
+++ 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 /**
  * A stream source that:
@@ -39,6 +40,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 public final class BoundedTestSource<T> implements SourceFunction<T>, 
CheckpointListener {
 
   private final List<List<T>> elementsPerCheckpoint;
+  private final boolean checkpointEnabled;
   private volatile boolean running = true;
 
   private final AtomicInteger numCheckpointsComplete = new AtomicInteger(0);
@@ -46,8 +48,13 @@ public final class BoundedTestSource<T> implements 
SourceFunction<T>, Checkpoint
   /**
    * Emits all those elements in several checkpoints.
    */
-  public BoundedTestSource(List<List<T>> elementsPerCheckpoint) {
+  public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean 
checkpointEnabled) {
     this.elementsPerCheckpoint = elementsPerCheckpoint;
+    this.checkpointEnabled = checkpointEnabled;
+  }
+
+  public BoundedTestSource(List<List<T>> elementsPerCheckpoint) {
+    this(elementsPerCheckpoint, true);
   }
 
   /**
@@ -59,7 +66,14 @@ public final class BoundedTestSource<T> implements 
SourceFunction<T>, Checkpoint
 
   @Override
   public void run(SourceContext<T> ctx) throws Exception {
-    for (int checkpoint = 0; checkpoint < elementsPerCheckpoint.size(); 
checkpoint++) {
+    if (!checkpointEnabled) {
+      Preconditions.checkArgument(elementsPerCheckpoint.size() <= 1,
+              "There should be at most one list in the elementsPerCheckpoint 
when checkpoint is disabled.");
+      
elementsPerCheckpoint.stream().flatMap(List::stream).forEach(ctx::collect);
+      return;
+    }
+
+    for (List<T> elements : elementsPerCheckpoint) {
 
       final int checkpointToAwait;
       synchronized (ctx.getCheckpointLock()) {
@@ -70,7 +84,7 @@ public final class BoundedTestSource<T> implements 
SourceFunction<T>, Checkpoint
         // affected in the end. Setting the delta to be 2 is introducing the 
variable that produce un-continuous
         // checkpoints that emit the records buffer from 
elementsPerCheckpoints.
         checkpointToAwait = numCheckpointsComplete.get() + 2;
-        for (T element : elementsPerCheckpoint.get(checkpoint)) {
+        for (T element : elements) {
           ctx.collect(element);
         }
       }

Reply via email to