This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 0323308566 Flink: Backport #10200 to v1.19 and v1.17 (#10259)
0323308566 is described below
commit 032330856631ebeb22718acbc64fb19011f2f064
Author: pvary <[email protected]>
AuthorDate: Wed May 1 20:46:38 2024 +0200
Flink: Backport #10200 to v1.19 and v1.17 (#10259)
---
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 3 +-
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 35 ++++++++++++++++++++++
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 3 +-
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 35 ++++++++++++++++++++++
4 files changed, 74 insertions(+), 2 deletions(-)
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 40e0b5f2a3..e8a46c5bec 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
@@ -109,7 +110,7 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
RowDataDeltaWriter(PartitionKey partition) {
- super(partition, schema, deleteSchema);
+ super(partition, schema, deleteSchema, DeleteGranularity.FILE);
}
@Override
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index b5c3bcf417..5e81c279b6 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -18,12 +18,17 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
@@ -45,6 +50,7 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -61,6 +67,8 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE);
+ @Rule public final Timeout globalTimeout = Timeout.seconds(60);
+
@Parameterized.Parameters(
name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2},
WriteDistributionMode ={3}")
public static Object[][] parameters() {
@@ -233,4 +241,31 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
public void testUpsertOnIdDataKey() throws Exception {
testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
}
+
+ @Test
+ public void testDeleteStats() throws Exception {
+ assumeThat(format).isNotEqualTo(FileFormat.AVRO);
+
+ List<List<Row>> elementsPerCheckpoint =
+ ImmutableList.of(
+ // Checkpoint #1
+ ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"),
row("+I", 1, "aaa")));
+
+ List<List<Record>> expectedRecords =
ImmutableList.of(ImmutableList.of(record(1, "aaa")));
+
+ testChangeLogs(
+ ImmutableList.of("id", "data"),
+ row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ "main");
+
+ DeleteFile deleteFile =
table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next();
+ String fromStat =
+ new String(
+
deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array());
+ DataFile dataFile =
table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
+ assumeThat(fromStat).isEqualTo(dataFile.path().toString());
+ }
}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 40e0b5f2a3..e8a46c5bec 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
@@ -109,7 +110,7 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
RowDataDeltaWriter(PartitionKey partition) {
- super(partition, schema, deleteSchema);
+ super(partition, schema, deleteSchema, DeleteGranularity.FILE);
}
@Override
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index b5c3bcf417..5e81c279b6 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -18,12 +18,17 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
@@ -45,6 +50,7 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -61,6 +67,8 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE);
+ @Rule public final Timeout globalTimeout = Timeout.seconds(60);
+
@Parameterized.Parameters(
name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2},
WriteDistributionMode ={3}")
public static Object[][] parameters() {
@@ -233,4 +241,31 @@ public class TestFlinkIcebergSinkV2 extends
TestFlinkIcebergSinkV2Base {
public void testUpsertOnIdDataKey() throws Exception {
testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
}
+
+ @Test
+ public void testDeleteStats() throws Exception {
+ assumeThat(format).isNotEqualTo(FileFormat.AVRO);
+
+ List<List<Row>> elementsPerCheckpoint =
+ ImmutableList.of(
+ // Checkpoint #1
+ ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"),
row("+I", 1, "aaa")));
+
+ List<List<Record>> expectedRecords =
ImmutableList.of(ImmutableList.of(record(1, "aaa")));
+
+ testChangeLogs(
+ ImmutableList.of("id", "data"),
+ row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ "main");
+
+ DeleteFile deleteFile =
table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next();
+ String fromStat =
+ new String(
+
deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array());
+ DataFile dataFile =
table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
+ assumeThat(fromStat).isEqualTo(dataFile.path().toString());
+ }
}