This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 0323308566 Flink: Backport #10200 to v1.19 and v1.17 (#10259)
0323308566 is described below

commit 032330856631ebeb22718acbc64fb19011f2f064
Author: pvary <[email protected]>
AuthorDate: Wed May 1 20:46:38 2024 +0200

    Flink: Backport #10200 to v1.19 and v1.17 (#10259)
---
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |  3 +-
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 35 ++++++++++++++++++++++
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |  3 +-
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 35 ++++++++++++++++++++++
 4 files changed, 74 insertions(+), 2 deletions(-)

diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 40e0b5f2a3..e8a46c5bec 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.RowDataWrapper;
 import org.apache.iceberg.flink.data.RowDataProjection;
@@ -109,7 +110,7 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
 
   protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
     RowDataDeltaWriter(PartitionKey partition) {
-      super(partition, schema, deleteSchema);
+      super(partition, schema, deleteSchema, DeleteGranularity.FILE);
     }
 
     @Override
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index b5c3bcf417..5e81c279b6 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -18,12 +18,17 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.assertj.core.api.Assumptions.assumeThat;
+
 import java.util.List;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.TableProperties;
@@ -45,6 +50,7 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -61,6 +67,8 @@ public class TestFlinkIcebergSinkV2 extends 
TestFlinkIcebergSinkV2Base {
   public final HadoopCatalogResource catalogResource =
       new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE);
 
+  @Rule public final Timeout globalTimeout = Timeout.seconds(60);
+
   @Parameterized.Parameters(
       name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, 
WriteDistributionMode ={3}")
   public static Object[][] parameters() {
@@ -233,4 +241,31 @@ public class TestFlinkIcebergSinkV2 extends 
TestFlinkIcebergSinkV2Base {
   public void testUpsertOnIdDataKey() throws Exception {
     testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
   }
+
+  @Test
+  public void testDeleteStats() throws Exception {
+    assumeThat(format).isNotEqualTo(FileFormat.AVRO);
+
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            // Checkpoint #1
+            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), 
row("+I", 1, "aaa")));
+
+    List<List<Record>> expectedRecords = 
ImmutableList.of(ImmutableList.of(record(1, "aaa")));
+
+    testChangeLogs(
+        ImmutableList.of("id", "data"),
+        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        false,
+        elementsPerCheckpoint,
+        expectedRecords,
+        "main");
+
+    DeleteFile deleteFile = 
table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next();
+    String fromStat =
+        new String(
+            
deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array());
+    DataFile dataFile = 
table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
+    assumeThat(fromStat).isEqualTo(dataFile.path().toString());
+  }
 }
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 40e0b5f2a3..e8a46c5bec 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.RowDataWrapper;
 import org.apache.iceberg.flink.data.RowDataProjection;
@@ -109,7 +110,7 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
 
   protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
     RowDataDeltaWriter(PartitionKey partition) {
-      super(partition, schema, deleteSchema);
+      super(partition, schema, deleteSchema, DeleteGranularity.FILE);
     }
 
     @Override
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index b5c3bcf417..5e81c279b6 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -18,12 +18,17 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.assertj.core.api.Assumptions.assumeThat;
+
 import java.util.List;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.TableProperties;
@@ -45,6 +50,7 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -61,6 +67,8 @@ public class TestFlinkIcebergSinkV2 extends 
TestFlinkIcebergSinkV2Base {
   public final HadoopCatalogResource catalogResource =
       new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE);
 
+  @Rule public final Timeout globalTimeout = Timeout.seconds(60);
+
   @Parameterized.Parameters(
       name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, 
WriteDistributionMode ={3}")
   public static Object[][] parameters() {
@@ -233,4 +241,31 @@ public class TestFlinkIcebergSinkV2 extends 
TestFlinkIcebergSinkV2Base {
   public void testUpsertOnIdDataKey() throws Exception {
     testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
   }
+
+  @Test
+  public void testDeleteStats() throws Exception {
+    assumeThat(format).isNotEqualTo(FileFormat.AVRO);
+
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            // Checkpoint #1
+            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), 
row("+I", 1, "aaa")));
+
+    List<List<Record>> expectedRecords = 
ImmutableList.of(ImmutableList.of(record(1, "aaa")));
+
+    testChangeLogs(
+        ImmutableList.of("id", "data"),
+        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        false,
+        elementsPerCheckpoint,
+        expectedRecords,
+        "main");
+
+    DeleteFile deleteFile = 
table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next();
+    String fromStat =
+        new String(
+            
deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array());
+    DataFile dataFile = 
table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
+    assumeThat(fromStat).isEqualTo(dataFile.path().toString());
+  }
 }

Reply via email to