This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/0.13.x by this push:
new f72a15bff Flink: Backport upsert delete file metadata fixes to 0.13
(#4786)
f72a15bff is described below
commit f72a15bff95e3d3b9c750c996f6590078baea5d6
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Thu May 19 22:20:47 2022 +0200
Flink: Backport upsert delete file metadata fixes to 0.13 (#4786)
Co-authored-by: Kyle Bendickson <[email protected]>
Co-authored-by: liliwei <[email protected]>
Co-authored-by: wangzeyu <[email protected]>
Co-authored-by: openinx <[email protected]>
---
.../java/org/apache/iceberg/io/BaseTaskWriter.java | 22 +-
.../iceberg/io/TestTaskEqualityDeltaWriter.java | 16 +-
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 5 +
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 10 +-
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 13 +-
.../flink/sink/RowDataTaskWriterFactory.java | 9 +-
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 287 +++++++++++++++++++++
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 10 +-
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 13 +-
.../flink/sink/RowDataTaskWriterFactory.java | 9 +-
.../apache/iceberg/flink/TestFlinkTableSink.java | 43 +--
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 287 +++++++++++++++++++++
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 10 +-
.../iceberg/flink/source/BoundedTableFactory.java | 30 ++-
.../iceberg/flink/source/BoundedTestSource.java | 20 +-
15 files changed, 724 insertions(+), 60 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 2d39acb20..bc2570952 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -115,6 +115,11 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
*/
protected abstract StructLike asStructLike(T data);
+ /**
+ * Wrap the passed in key of a row as a {@link StructLike}
+ */
+ protected abstract StructLike asStructLikeKey(T key);
+
public void write(T row) throws IOException {
PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(),
dataWriter.currentRows());
@@ -136,13 +141,16 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
*
* @param key has the same columns with the equality fields.
*/
- private void internalPosDelete(StructLike key) {
+ private boolean internalPosDelete(StructLike key) {
PathOffset previous = insertedRowMap.remove(key);
if (previous != null) {
// TODO attach the previous row if has a positional-delete row schema
in appender factory.
posDeleteWriter.delete(previous.path, previous.rowOffset, null);
+ return true;
}
+
+ return false;
}
/**
@@ -152,9 +160,9 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
* @param row the given row to delete.
*/
public void delete(T row) throws IOException {
- internalPosDelete(structProjection.wrap(asStructLike(row)));
-
- eqDeleteWriter.write(row);
+ if (!internalPosDelete(structProjection.wrap(asStructLike(row)))) {
+ eqDeleteWriter.write(row);
+ }
}
/**
@@ -164,9 +172,9 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
* @param key is the projected data whose columns are the same as the
equality fields.
*/
public void deleteKey(T key) throws IOException {
- internalPosDelete(asStructLike(key));
-
- eqDeleteWriter.write(key);
+ if (!internalPosDelete(asStructLikeKey(key))) {
+ eqDeleteWriter.write(key);
+ }
}
@Override
diff --git
a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
index c59a757eb..82c194436 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
@@ -209,7 +209,7 @@ public class TestTaskEqualityDeltaWriter extends
TableTestBase {
WriteResult result = deltaWriter.complete();
Assert.assertEquals("Should have a data file.", 1,
result.dataFiles().length);
- Assert.assertEquals("Should have a pos-delete file and an eq-delete file",
2, result.deleteFiles().length);
+ Assert.assertEquals("Should have a pos-delete file.", 1,
result.deleteFiles().length);
commitTransaction(result);
Assert.assertEquals("Should have an expected record",
expectedRowSet(ImmutableList.of(record)), actualRowSet("*"));
@@ -217,12 +217,8 @@ public class TestTaskEqualityDeltaWriter extends
TableTestBase {
DataFile dataFile = result.dataFiles()[0];
Assert.assertEquals(ImmutableList.of(record, record),
readRecordsAsList(table.schema(), dataFile.path()));
- // Check records in the eq-delete file.
- DeleteFile eqDeleteFile = result.deleteFiles()[0];
- Assert.assertEquals(ImmutableList.of(record),
readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.path()));
-
// Check records in the pos-delete file.
- DeleteFile posDeleteFile = result.deleteFiles()[1];
+ DeleteFile posDeleteFile = result.deleteFiles()[0];
Assert.assertEquals(ImmutableList.of(
posRecord.copy("file_path", dataFile.path(), "pos", 0L)
), readRecordsAsList(DeleteSchemaUtil.pathPosSchema(),
posDeleteFile.path()));
@@ -305,7 +301,6 @@ public class TestTaskEqualityDeltaWriter extends
TableTestBase {
DeleteFile eqDeleteFile = result.deleteFiles()[0];
Assert.assertEquals(FileContent.EQUALITY_DELETES, eqDeleteFile.content());
Assert.assertEquals(ImmutableList.of(
- keyFunc.apply("aaa"),
keyFunc.apply("aaa"),
keyFunc.apply("ccc"),
keyFunc.apply("bbb")
@@ -389,7 +384,6 @@ public class TestTaskEqualityDeltaWriter extends
TableTestBase {
Assert.assertEquals(FileContent.EQUALITY_DELETES, eqDeleteFile.content());
Assert.assertEquals(ImmutableList.of(
createRecord(3, "aaa"),
- createRecord(5, "aaa"),
createRecord(4, "ccc"),
createRecord(2, "bbb")
), readRecordsAsList(eqDeleteRowSchema, eqDeleteFile.path()));
@@ -467,6 +461,7 @@ public class TestTaskEqualityDeltaWriter extends
TableTestBase {
deltaWriter.delete(row);
}
+ // The caller of this function is responsible for passing in a record with
only the key fields
public void deleteKey(Record key) throws IOException {
deltaWriter.deleteKey(key);
}
@@ -485,6 +480,11 @@ public class TestTaskEqualityDeltaWriter extends
TableTestBase {
protected StructLike asStructLike(Record row) {
return row;
}
+
+ @Override
+ protected StructLike asStructLikeKey(Record data) {
+ return data;
+ }
}
}
diff --git
a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129db..f4eeb8d68 100644
---
a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++
b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -103,5 +103,10 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}
+
+ @Override
+ protected StructLike asStructLikeKey(RowData data) {
+ return wrapper.wrap(data);
+ }
}
}
diff --git
a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 79009dea7..5a3cdae7e 100644
---
a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++
b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -135,7 +135,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
WriteResult result = writer.complete();
Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length);
- Assert.assertEquals(partitioned ? 6 : 2, result.deleteFiles().length);
+ Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length);
commitTransaction(result);
Assert.assertEquals("Should have expected records.", expectedRowSet(
@@ -305,13 +305,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
writer.write(createInsert(1, "aaa"));
writer.write(createInsert(2, "aaa"));
- writer.write(createDelete(2, "aaa")); // 1 pos-delete and 1 eq-delete.
+ writer.write(createDelete(2, "aaa")); // 1 pos-delete.
WriteResult result = writer.complete();
Assert.assertEquals(1, result.dataFiles().length);
- Assert.assertEquals(2, result.deleteFiles().length);
- Assert.assertEquals(Sets.newHashSet(FileContent.EQUALITY_DELETES,
FileContent.POSITION_DELETES),
- Sets.newHashSet(result.deleteFiles()[0].content(),
result.deleteFiles()[1].content()));
+ Assert.assertEquals(1, result.deleteFiles().length);
+ Assert.assertEquals(Sets.newHashSet(FileContent.POSITION_DELETES),
+ Sets.newHashSet(result.deleteFiles()[0].content()));
commitTransaction(result);
Assert.assertEquals("Should have expected records", expectedRowSet(
diff --git
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129db..356b7346d 100644
---
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++
b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -28,7 +28,9 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
@@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
+ private final RowDataWrapper keyWrapper;
+ private final RowDataProjection keyProjection;
private final boolean upsert;
BaseDeltaTaskWriter(PartitionSpec spec,
@@ -58,6 +62,8 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
this.deleteSchema = TypeUtil.select(schema,
Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.upsert = upsert;
+ this.keyWrapper = new
RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
+ this.keyProjection = RowDataProjection.create(schema, deleteSchema);
}
abstract RowDataDeltaWriter route(RowData row);
@@ -74,7 +80,7 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
- writer.delete(row);
+ writer.deleteKey(keyProjection.wrap(row));
}
writer.write(row);
break;
@@ -103,5 +109,10 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}
+
+ @Override
+ protected StructLike asStructLikeKey(RowData data) {
+ return keyWrapper.wrap(data);
+ }
}
}
diff --git
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 284910085..514430502 100644
---
a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++
b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -35,6 +35,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;
public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
@@ -69,8 +71,13 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema,
table.properties(), spec);
+ } else if (upsert) {
+ // In upsert mode, only the new row is emitted using INSERT row kind.
Therefore, any column of the inserted row
+ // may differ from the deleted row other than the primary key fields,
and the delete file must contain values
+ // that are correct for the deleted row. Therefore, only write the
equality delete fields.
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema,
table.properties(), spec,
+ ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema,
Sets.newHashSet(equalityFieldIds)), null);
} else {
- // TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema,
table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
diff --git
a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
new file mode 100644
index 000000000..6ec35e2ff
--- /dev/null
+++
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkUpsert extends FlinkCatalogTestBase {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private final boolean isStreamingJob;
+ private final Map<String, String> tableUpsertProps = Maps.newHashMap();
+ private TableEnvironment tEnv;
+
+ public TestFlinkUpsert(String catalogName, Namespace baseNamespace,
FileFormat format, Boolean isStreamingJob) {
+ super(catalogName, baseNamespace);
+ this.isStreamingJob = isStreamingJob;
+ tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+ tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
+ tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+ }
+
+ @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1},
format={2}, isStreaming={3}")
+ public static Iterable<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ for (FileFormat format : new FileFormat[] {FileFormat.PARQUET,
FileFormat.AVRO, FileFormat.ORC}) {
+ for (Boolean isStreaming : new Boolean[] {true, false}) {
+ // Only test with one catalog as this is a file operation concern.
+ // FlinkCatalogTestBase requires the catalog name start with
testhadoop if using hadoop catalog.
+ String catalogName = "testhadoop";
+ Namespace baseNamespace = Namespace.of("default");
+ parameters.add(new Object[] {catalogName, baseNamespace, format,
isStreaming});
+ }
+ }
+ return parameters;
+ }
+
+ @Override
+ protected TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ synchronized (this) {
+ EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
+ .newInstance();
+ if (isStreamingJob) {
+ settingsBuilder.inStreamingMode();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+
.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ env.enableCheckpointing(400);
+ env.setMaxParallelism(2);
+ env.setParallelism(2);
+ tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+ } else {
+ settingsBuilder.inBatchMode();
+ tEnv = TableEnvironment.create(settingsBuilder.build());
+ }
+ }
+ }
+ return tEnv;
+ }
+
+ @Override
+ @Before
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ }
+
+ @Override
+ @After
+ public void clean() {
+ sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ super.clean();
+ }
+
+ @Test
+ public void testUpsertAndQuery() {
+ String tableName = "test_upsert_query";
+ LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
+ LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
+
+ sql("CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE,
PRIMARY KEY(id,province) NOT ENFORCED) " +
+ "PARTITIONED BY (province) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ try {
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'a', TO_DATE('2022-03-01'))," +
+ "(2, 'b', TO_DATE('2022-03-01'))," +
+ "(1, 'b', TO_DATE('2022-03-01'))",
+ tableName);
+
+ sql("INSERT INTO %s VALUES " +
+ "(4, 'a', TO_DATE('2022-03-02'))," +
+ "(5, 'b', TO_DATE('2022-03-02'))," +
+ "(1, 'b', TO_DATE('2022-03-02'))",
+ tableName);
+
+ List<Row> rowsOn20220301 = Lists.newArrayList(Row.of(2, "b",
dt20220301), Row.of(1, "a", dt20220301));
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName),
+ rowsOn20220301);
+
+ List<Row> rowsOn20220302 = Lists.newArrayList(
+ Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5,
"b", dt20220302));
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName),
+ rowsOn20220302);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Iterables.concat(rowsOn20220301,
rowsOn20220302)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @Test
+ public void testPrimaryKeyEqualToPartitionKey() {
+ // This is an SQL based reproduction of
TestFlinkIcebergSinkV2#testUpsertOnDataKey
+ String tableName = "upsert_on_data_key";
+ try {
+ sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY
KEY(data) NOT ENFORCED) " +
+ "PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'aaa')," +
+ "(2, 'aaa')," +
+ "(3, 'bbb')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));
+
+ sql("INSERT INTO %s VALUES " +
+ "(4, 'aaa')," +
+ "(5, 'bbb')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));
+
+ sql("INSERT INTO %s VALUES " +
+ "(6, 'aaa')," +
+ "(7, 'bbb')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @Test
+ public void testPrimaryKeyFieldsAtBeginningOfSchema() {
+ String tableName = "upsert_on_pk_at_schema_start";
+ LocalDate dt = LocalDate.of(2022, 3, 1);
+ try {
+ sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT,
PRIMARY KEY(data,dt) NOT ENFORCED) " +
+ "PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql("INSERT INTO %s VALUES " +
+ "('aaa', TO_DATE('2022-03-01'), 1)," +
+ "('aaa', TO_DATE('2022-03-01'), 2)," +
+ "('bbb', TO_DATE('2022-03-01'), 3)",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));
+
+ sql("INSERT INTO %s VALUES " +
+ "('aaa', TO_DATE('2022-03-01'), 4)," +
+ "('bbb', TO_DATE('2022-03-01'), 5)",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));
+
+ sql("INSERT INTO %s VALUES " +
+ "('aaa', TO_DATE('2022-03-01'), 6)," +
+ "('bbb', TO_DATE('2022-03-01'), 7)",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @Test
+ public void testPrimaryKeyFieldsAtEndOfTableSchema() {
+ // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema,
but the primary key fields
+ // are located at the end of the flink schema.
+ String tableName = "upsert_on_pk_at_schema_end";
+ LocalDate dt = LocalDate.of(2022, 3, 1);
+ try {
+ sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL,
PRIMARY KEY(data,dt) NOT ENFORCED) " +
+ "PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'aaa', TO_DATE('2022-03-01'))," +
+ "(2, 'aaa', TO_DATE('2022-03-01'))," +
+ "(3, 'bbb', TO_DATE('2022-03-01'))",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));
+
+ sql("INSERT INTO %s VALUES " +
+ "(4, 'aaa', TO_DATE('2022-03-01'))," +
+ "(5, 'bbb', TO_DATE('2022-03-01'))",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));
+
+ sql("INSERT INTO %s VALUES " +
+ "(6, 'aaa', TO_DATE('2022-03-01'))," +
+ "(7, 'bbb', TO_DATE('2022-03-01'))",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+}
diff --git
a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 280e14807..34b7f0367 100644
---
a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++
b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -136,7 +136,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
WriteResult result = writer.complete();
Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length);
- Assert.assertEquals(partitioned ? 6 : 2, result.deleteFiles().length);
+ Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length);
commitTransaction(result);
Assert.assertEquals("Should have expected records.", expectedRowSet(
@@ -306,13 +306,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
writer.write(createInsert(1, "aaa"));
writer.write(createInsert(2, "aaa"));
- writer.write(createDelete(2, "aaa")); // 1 pos-delete and 1 eq-delete.
+ writer.write(createDelete(2, "aaa")); // 1 pos-delete.
WriteResult result = writer.complete();
Assert.assertEquals(1, result.dataFiles().length);
- Assert.assertEquals(2, result.deleteFiles().length);
- Assert.assertEquals(Sets.newHashSet(FileContent.EQUALITY_DELETES,
FileContent.POSITION_DELETES),
- Sets.newHashSet(result.deleteFiles()[0].content(),
result.deleteFiles()[1].content()));
+ Assert.assertEquals(1, result.deleteFiles().length);
+ Assert.assertEquals(Sets.newHashSet(FileContent.POSITION_DELETES),
+ Sets.newHashSet(result.deleteFiles()[0].content()));
commitTransaction(result);
Assert.assertEquals("Should have expected records", expectedRowSet(
diff --git
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 8415129db..16262b22e 100644
---
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -28,7 +28,9 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
@@ -41,6 +43,8 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
+ private final RowDataWrapper keyWrapper;
+ private final RowDataProjection keyProjection;
private final boolean upsert;
BaseDeltaTaskWriter(PartitionSpec spec,
@@ -57,6 +61,8 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema,
Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ this.keyWrapper = new
RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
+ this.keyProjection = RowDataProjection.create(schema, deleteSchema);
this.upsert = upsert;
}
@@ -74,7 +80,7 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
- writer.delete(row);
+ writer.deleteKey(keyProjection.wrap(row));
}
writer.write(row);
break;
@@ -103,5 +109,10 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
protected StructLike asStructLike(RowData data) {
return wrapper.wrap(data);
}
+
+ @Override
+ protected StructLike asStructLikeKey(RowData data) {
+ return keyWrapper.wrap(data);
+ }
}
}
diff --git
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 284910085..514430502 100644
---
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -35,6 +35,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;
public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
@@ -69,8 +71,13 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema,
table.properties(), spec);
+ } else if (upsert) {
+ // In upsert mode, only the new row is emitted using INSERT row kind.
Therefore, any column of the inserted row
+ // may differ from the deleted row other than the primary key fields,
and the delete file must contain values
+ // that are correct for the deleted row. Therefore, only write the
equality delete fields.
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema,
table.properties(), spec,
+ ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema,
Sets.newHashSet(equalityFieldIds)), null);
} else {
- // TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema,
table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
diff --git
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index 9cce4a043..0c30b0916 100644
---
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -21,6 +21,8 @@ package org.apache.iceberg.flink;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
@@ -28,6 +30,7 @@ import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
@@ -35,9 +38,11 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.BoundedTableFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -58,6 +63,7 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase {
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ private static final String SOURCE_TABLE =
"default_catalog.default_database.bounded_source";
private static final String TABLE_NAME = "test_table";
private TableEnvironment tEnv;
private Table icebergTable;
@@ -127,6 +133,7 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ BoundedTableFactory.clearDataSets();
super.clean();
}
@@ -253,33 +260,37 @@ public class TestFlinkTableSink extends
FlinkCatalogTestBase {
"write.format.default", format.name(),
TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName()
);
+
+ // Initialize a BoundedSource table to precisely emit those rows in only
one checkpoint.
+ List<Row> dataSet = IntStream.range(1, 1000)
+ .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"),
Row.of(i, "ccc")))
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ String dataId =
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+ sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" +
+ " WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE,
dataId);
+ Assert.assertEquals("Should have the expected rows in source table.",
Sets.newHashSet(dataSet),
+ Sets.newHashSet(sql("SELECT * FROM %s", SOURCE_TABLE)));
+
sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
tableName, toWithClause(tableProps));
try {
// Insert data set.
- sql("INSERT INTO %s VALUES " +
- "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
- "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
- "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
+ sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
- Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
- SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
- SimpleDataUtil.createRecord(1, "aaa"),
- SimpleDataUtil.createRecord(1, "bbb"),
- SimpleDataUtil.createRecord(1, "ccc"),
- SimpleDataUtil.createRecord(2, "aaa"),
- SimpleDataUtil.createRecord(2, "bbb"),
- SimpleDataUtil.createRecord(2, "ccc"),
- SimpleDataUtil.createRecord(3, "aaa"),
- SimpleDataUtil.createRecord(3, "bbb"),
- SimpleDataUtil.createRecord(3, "ccc")
- ));
+ Assert.assertEquals("Should have the expected rows in sink table.",
Sets.newHashSet(dataSet),
+ Sets.newHashSet(sql("SELECT * FROM %s", tableName)));
// Sometimes we will have more than one checkpoint if we pass the auto
checkpoint interval,
// thus producing multiple snapshots. Here we assert that each snapshot
has only 1 file per partition.
+ Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
Map<Long, List<DataFile>> snapshotToDataFiles =
SimpleDataUtil.snapshotToDataFiles(table);
for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
+ if (dataFiles.isEmpty()) {
+ continue;
+ }
+
Assert.assertEquals("There should be 1 data file in partition 'aaa'",
1,
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(),
ImmutableMap.of("data", "aaa")).size());
Assert.assertEquals("There should be 1 data file in partition 'bbb'",
1,
diff --git
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
new file mode 100644
index 000000000..6ec35e2ff
--- /dev/null
+++
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkUpsert extends FlinkCatalogTestBase {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private final boolean isStreamingJob;
+ private final Map<String, String> tableUpsertProps = Maps.newHashMap();
+ private TableEnvironment tEnv;
+
+ public TestFlinkUpsert(String catalogName, Namespace baseNamespace,
FileFormat format, Boolean isStreamingJob) {
+ super(catalogName, baseNamespace);
+ this.isStreamingJob = isStreamingJob;
+ tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+ tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
+ tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+ }
+
+ @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1},
format={2}, isStreaming={3}")
+ public static Iterable<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ for (FileFormat format : new FileFormat[] {FileFormat.PARQUET,
FileFormat.AVRO, FileFormat.ORC}) {
+ for (Boolean isStreaming : new Boolean[] {true, false}) {
+ // Only test with one catalog as this is a file operation concern.
+ // FlinkCatalogTestBase requires the catalog name start with
testhadoop if using hadoop catalog.
+ String catalogName = "testhadoop";
+ Namespace baseNamespace = Namespace.of("default");
+ parameters.add(new Object[] {catalogName, baseNamespace, format,
isStreaming});
+ }
+ }
+ return parameters;
+ }
+
+ @Override
+ protected TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ synchronized (this) {
+ EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
+ .newInstance();
+ if (isStreamingJob) {
+ settingsBuilder.inStreamingMode();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+
.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ env.enableCheckpointing(400);
+ env.setMaxParallelism(2);
+ env.setParallelism(2);
+ tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+ } else {
+ settingsBuilder.inBatchMode();
+ tEnv = TableEnvironment.create(settingsBuilder.build());
+ }
+ }
+ }
+ return tEnv;
+ }
+
+ @Override
+ @Before
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ }
+
+ @Override
+ @After
+ public void clean() {
+ sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ super.clean();
+ }
+
+ @Test
+ public void testUpsertAndQuery() {
+ String tableName = "test_upsert_query";
+ LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
+ LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
+
+ sql("CREATE TABLE %s(id INT NOT NULL, province STRING NOT NULL, dt DATE,
PRIMARY KEY(id,province) NOT ENFORCED) " +
+ "PARTITIONED BY (province) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ try {
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'a', TO_DATE('2022-03-01'))," +
+ "(2, 'b', TO_DATE('2022-03-01'))," +
+ "(1, 'b', TO_DATE('2022-03-01'))",
+ tableName);
+
+ sql("INSERT INTO %s VALUES " +
+ "(4, 'a', TO_DATE('2022-03-02'))," +
+ "(5, 'b', TO_DATE('2022-03-02'))," +
+ "(1, 'b', TO_DATE('2022-03-02'))",
+ tableName);
+
+ List<Row> rowsOn20220301 = Lists.newArrayList(Row.of(2, "b",
dt20220301), Row.of(1, "a", dt20220301));
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName),
+ rowsOn20220301);
+
+ List<Row> rowsOn20220302 = Lists.newArrayList(
+ Row.of(1, "b", dt20220302), Row.of(4, "a", dt20220302), Row.of(5,
"b", dt20220302));
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName),
+ rowsOn20220302);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Iterables.concat(rowsOn20220301,
rowsOn20220302)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @Test
+ public void testPrimaryKeyEqualToPartitionKey() {
+ // This is an SQL based reproduction of
TestFlinkIcebergSinkV2#testUpsertOnDataKey
+ String tableName = "upsert_on_data_key";
+ try {
+ sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL, PRIMARY
KEY(data) NOT ENFORCED) " +
+ "PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'aaa')," +
+ "(2, 'aaa')," +
+ "(3, 'bbb')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(2, "aaa"), Row.of(3, "bbb")));
+
+ sql("INSERT INTO %s VALUES " +
+ "(4, 'aaa')," +
+ "(5, 'bbb')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(4, "aaa"), Row.of(5, "bbb")));
+
+ sql("INSERT INTO %s VALUES " +
+ "(6, 'aaa')," +
+ "(7, 'bbb')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(6, "aaa"), Row.of(7, "bbb")));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @Test
+ public void testPrimaryKeyFieldsAtBeginningOfSchema() {
+ String tableName = "upsert_on_pk_at_schema_start";
+ LocalDate dt = LocalDate.of(2022, 3, 1);
+ try {
+ sql("CREATE TABLE %s(data STRING NOT NULL, dt DATE NOT NULL, id INT,
PRIMARY KEY(data,dt) NOT ENFORCED) " +
+ "PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql("INSERT INTO %s VALUES " +
+ "('aaa', TO_DATE('2022-03-01'), 1)," +
+ "('aaa', TO_DATE('2022-03-01'), 2)," +
+ "('bbb', TO_DATE('2022-03-01'), 3)",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("aaa", dt, 2), Row.of("bbb", dt, 3)));
+
+ sql("INSERT INTO %s VALUES " +
+ "('aaa', TO_DATE('2022-03-01'), 4)," +
+ "('bbb', TO_DATE('2022-03-01'), 5)",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("aaa", dt, 4), Row.of("bbb", dt, 5)));
+
+ sql("INSERT INTO %s VALUES " +
+ "('aaa', TO_DATE('2022-03-01'), 6)," +
+ "('bbb', TO_DATE('2022-03-01'), 7)",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("aaa", dt, 6), Row.of("bbb", dt, 7)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @Test
+ public void testPrimaryKeyFieldsAtEndOfTableSchema() {
+ // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema,
but the primary key fields
+ // are located at the end of the flink schema.
+ String tableName = "upsert_on_pk_at_schema_end";
+ LocalDate dt = LocalDate.of(2022, 3, 1);
+ try {
+ sql("CREATE TABLE %s(id INT, data STRING NOT NULL, dt DATE NOT NULL,
PRIMARY KEY(data,dt) NOT ENFORCED) " +
+ "PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql("INSERT INTO %s VALUES " +
+ "(1, 'aaa', TO_DATE('2022-03-01'))," +
+ "(2, 'aaa', TO_DATE('2022-03-01'))," +
+ "(3, 'bbb', TO_DATE('2022-03-01'))",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(2, "aaa", dt), Row.of(3, "bbb", dt)));
+
+ sql("INSERT INTO %s VALUES " +
+ "(4, 'aaa', TO_DATE('2022-03-01'))," +
+ "(5, 'bbb', TO_DATE('2022-03-01'))",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(4, "aaa", dt), Row.of(5, "bbb", dt)));
+
+ sql("INSERT INTO %s VALUES " +
+ "(6, 'aaa', TO_DATE('2022-03-01'))," +
+ "(7, 'bbb', TO_DATE('2022-03-01'))",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(6, "aaa", dt), Row.of(7, "bbb", dt)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+}
diff --git
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 280e14807..e30412ad8 100644
---
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -136,7 +136,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
WriteResult result = writer.complete();
Assert.assertEquals(partitioned ? 7 : 1, result.dataFiles().length);
- Assert.assertEquals(partitioned ? 6 : 2, result.deleteFiles().length);
+ Assert.assertEquals(partitioned ? 3 : 1, result.deleteFiles().length);
commitTransaction(result);
Assert.assertEquals("Should have expected records.", expectedRowSet(
@@ -306,13 +306,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
writer.write(createInsert(1, "aaa"));
writer.write(createInsert(2, "aaa"));
- writer.write(createDelete(2, "aaa")); // 1 pos-delete and 1 eq-delete.
+ writer.write(createDelete(2, "aaa")); // 1 pos-delete.
WriteResult result = writer.complete();
Assert.assertEquals(1, result.dataFiles().length);
- Assert.assertEquals(2, result.deleteFiles().length);
- Assert.assertEquals(Sets.newHashSet(FileContent.EQUALITY_DELETES,
FileContent.POSITION_DELETES),
- Sets.newHashSet(result.deleteFiles()[0].content(),
result.deleteFiles()[1].content()));
+ Assert.assertEquals(1, result.deleteFiles().length);
+ Assert.assertEquals(Sets.newHashSet(FileContent.POSITION_DELETES),
+ Sets.newHashSet(result.deleteFiles()[0].content()));
commitTransaction(result);
Assert.assertEquals("Should have expected records", expectedRowSet(
diff --git
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
index a039be9d5..b0041c3bc 100644
---
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
+++
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@@ -107,12 +109,25 @@ public class BoundedTableFactory implements
DynamicTableSourceFactory {
@Override
public ChangelogMode getChangelogMode() {
- return ChangelogMode.newBuilder()
- .addContainedKind(RowKind.INSERT)
- .addContainedKind(RowKind.DELETE)
- .addContainedKind(RowKind.UPDATE_BEFORE)
- .addContainedKind(RowKind.UPDATE_AFTER)
- .build();
+ Supplier<Stream<Row>> supplier = () ->
elementsPerCheckpoint.stream().flatMap(List::stream);
+
+ // Add the INSERT row kind by default.
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT);
+
+ if (supplier.get().anyMatch(r -> r.getKind() == RowKind.DELETE)) {
+ builder.addContainedKind(RowKind.DELETE);
+ }
+
+ if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_BEFORE)) {
+ builder.addContainedKind(RowKind.UPDATE_BEFORE);
+ }
+
+ if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_AFTER)) {
+ builder.addContainedKind(RowKind.UPDATE_AFTER);
+ }
+
+ return builder.build();
}
@Override
@@ -120,7 +135,8 @@ public class BoundedTableFactory implements
DynamicTableSourceFactory {
return new DataStreamScanProvider() {
@Override
public DataStream<RowData>
produceDataStream(StreamExecutionEnvironment env) {
- SourceFunction<Row> source = new
BoundedTestSource<>(elementsPerCheckpoint);
+ boolean checkpointEnabled =
env.getCheckpointConfig().isCheckpointingEnabled();
+ SourceFunction<Row> source = new
BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled);
RowType rowType = (RowType)
tableSchema.toRowDataType().getLogicalType();
// Converter to convert the Row to RowData.
diff --git
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
index 6f6712dea..54e44ee5b 100644
---
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
+++
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
/**
* A stream source that:
@@ -39,6 +40,7 @@ import
org.apache.flink.streaming.api.functions.source.SourceFunction;
public final class BoundedTestSource<T> implements SourceFunction<T>,
CheckpointListener {
private final List<List<T>> elementsPerCheckpoint;
+ private final boolean checkpointEnabled;
private volatile boolean running = true;
private final AtomicInteger numCheckpointsComplete = new AtomicInteger(0);
@@ -46,8 +48,13 @@ public final class BoundedTestSource<T> implements
SourceFunction<T>, Checkpoint
/**
* Emits all those elements in several checkpoints.
*/
- public BoundedTestSource(List<List<T>> elementsPerCheckpoint) {
+ public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean
checkpointEnabled) {
this.elementsPerCheckpoint = elementsPerCheckpoint;
+ this.checkpointEnabled = checkpointEnabled;
+ }
+
+ public BoundedTestSource(List<List<T>> elementsPerCheckpoint) {
+ this(elementsPerCheckpoint, true);
}
/**
@@ -59,7 +66,14 @@ public final class BoundedTestSource<T> implements
SourceFunction<T>, Checkpoint
@Override
public void run(SourceContext<T> ctx) throws Exception {
- for (int checkpoint = 0; checkpoint < elementsPerCheckpoint.size();
checkpoint++) {
+ if (!checkpointEnabled) {
+ Preconditions.checkArgument(elementsPerCheckpoint.size() <= 1,
+ "There should be at most one list in the elementsPerCheckpoint
when checkpoint is disabled.");
+
elementsPerCheckpoint.stream().flatMap(List::stream).forEach(ctx::collect);
+ return;
+ }
+
+ for (List<T> elements : elementsPerCheckpoint) {
final int checkpointToAwait;
synchronized (ctx.getCheckpointLock()) {
@@ -70,7 +84,7 @@ public final class BoundedTestSource<T> implements
SourceFunction<T>, Checkpoint
// affected in the end. Setting the delta to be 2 is introducing the
variable that produce un-continuous
// checkpoints that emit the records buffer from
elementsPerCheckpoints.
checkpointToAwait = numCheckpointsComplete.get() + 2;
- for (T element : elementsPerCheckpoint.get(checkpoint)) {
+ for (T element : elements) {
ctx.collect(element);
}
}