This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b7d3a7f9d3 Flink: Fix bounded source state restore record duplication 
(#10208)
b7d3a7f9d3 is described below

commit b7d3a7f9d33aae2a85f9a742ed1ab87608283d28
Author: pvary <[email protected]>
AuthorDate: Fri Apr 26 12:30:16 2024 +0200

    Flink: Fix bounded source state restore record duplication (#10208)
---
 .../apache/iceberg/flink/source/IcebergSource.java |   8 +-
 .../org/apache/iceberg/flink/SimpleDataUtil.java   |   6 +-
 .../flink/source/TestIcebergSourceFailover.java    | 107 +++++++++++++++++----
 ...cebergSourceFailoverWithWatermarkExtractor.java |  15 +--
 4 files changed, 99 insertions(+), 37 deletions(-)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 0655cf87a9..8ec92c8c2c 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -201,8 +201,12 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       return new ContinuousIcebergEnumerator(
           enumContext, assigner, scanContext, splitPlanner, enumState);
     } else {
-      List<IcebergSourceSplit> splits = 
planSplitsForBatch(planningThreadName());
-      assigner.onDiscoveredSplits(splits);
+      if (enumState == null) {
+        // Only do scan planning if nothing is restored from checkpoint state
+        List<IcebergSourceSplit> splits = 
planSplitsForBatch(planningThreadName());
+        assigner.onDiscoveredSplits(splits);
+      }
+
       return new StaticIcebergEnumerator(enumContext, assigner);
     }
   }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index f48764f772..ce6caca121 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -284,11 +284,7 @@ public class SimpleDataUtil {
   public static void assertTableRecords(Table table, List<Record> expected, 
Duration timeout) {
     Awaitility.await("expected list of records should be produced")
         .atMost(timeout)
-        .untilAsserted(
-            () -> {
-              equalsRecords(expected, tableRecords(table), table.schema());
-              assertRecordsEqual(expected, tableRecords(table), 
table.schema());
-            });
+        .untilAsserted(() -> assertRecordsEqual(expected, tableRecords(table), 
table.schema()));
   }
 
   public static void assertTableRecords(Table table, List<Record> expected) 
throws IOException {
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index 7d991ee603..7047a62a2c 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.apache.iceberg.flink.SimpleDataUtil.tableRecords;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -29,7 +32,9 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -40,10 +45,12 @@ import 
org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadOptions;
 import org.apache.iceberg.flink.HadoopTableResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestFixtures;
@@ -57,7 +64,10 @@ import org.junit.rules.TemporaryFolder;
 
 public class TestIcebergSourceFailover {
 
-  private static final int PARALLELISM = 4;
+  // Parallelism higher than 1, but lower than the number of splits used by 
some of our tests
+  // The goal is to allow some splits to remain in the enumerator when 
restoring the state
+  private static final int PARALLELISM = 2;
+  private static final int DO_NOT_FAIL = Integer.MAX_VALUE;
 
   @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
 
@@ -87,6 +97,10 @@ public class TestIcebergSourceFailover {
     return IcebergSource.forRowData()
         .tableLoader(sourceTableResource.tableLoader())
         .assignerFactory(new SimpleSplitAssignerFactory())
+        // Prevent combining splits
+        .set(
+            FlinkReadOptions.SPLIT_FILE_OPEN_COST,
+            Long.toString(TableProperties.SPLIT_SIZE_DEFAULT))
         .flinkConfig(config);
   }
 
@@ -103,6 +117,55 @@ public class TestIcebergSourceFailover {
     SimpleDataUtil.assertTableRecords(table, expectedRecords, timeout);
   }
 
+  @Test
+  public void testBoundedWithSavepoint() throws Exception {
+    List<Record> expectedRecords = Lists.newArrayList();
+    Table sinkTable = sinkTableResource.table();
+    GenericAppenderHelper dataAppender =
+        new GenericAppenderHelper(
+            sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+    for (int i = 0; i < 4; ++i) {
+      List<Record> records = generateRecords(2, i);
+      expectedRecords.addAll(records);
+      dataAppender.appendToTable(records);
+    }
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    createBoundedStreams(env, 2);
+
+    JobClient jobClient = env.executeAsync("Bounded Iceberg Source Savepoint 
Test");
+    JobID jobId = jobClient.getJobID();
+
+    // Write something, but do not finish before checkpoint is created
+    RecordCounterToFail.waitToFail();
+    CompletableFuture<String> savepoint =
+        miniClusterResource
+            .getClusterClient()
+            .stopWithSavepoint(
+                jobId,
+                false,
+                TEMPORARY_FOLDER.newFolder().toPath().toString(),
+                SavepointFormatType.CANONICAL);
+    RecordCounterToFail.continueProcessing();
+
+    // Wait for the job to stop with the savepoint
+    String savepointPath = savepoint.get();
+
+    // We expect that at least a few records has written
+    assertThat(tableRecords(sinkTable)).hasSizeGreaterThan(0);
+
+    // New env from the savepoint
+    Configuration conf = new Configuration();
+    conf.set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
+    env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
+    createBoundedStreams(env, DO_NOT_FAIL);
+
+    env.execute("Bounded Iceberg Source Savepoint Test");
+
+    // We expect no duplications
+    assertRecords(sinkTable, expectedRecords, Duration.ofSeconds(120));
+  }
+
   @Test
   public void testBoundedWithTaskManagerFailover() throws Exception {
     testBoundedIcebergSource(FailoverType.TM);
@@ -125,26 +188,8 @@ public class TestIcebergSourceFailover {
     }
 
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-    env.setParallelism(PARALLELISM);
     env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
-
-    DataStream<RowData> stream =
-        env.fromSource(
-            sourceBuilder().build(),
-            WatermarkStrategy.noWatermarks(),
-            "IcebergSource",
-            TypeInformation.of(RowData.class));
-
-    DataStream<RowData> streamFailingInTheMiddleOfReading =
-        RecordCounterToFail.wrapWithFailureAfter(stream, 
expectedRecords.size() / 2);
-
-    // CollectStreamSink from DataStream#executeAndCollect() doesn't guarantee
-    // exactly-once behavior. When Iceberg sink, we can verify end-to-end
-    // exactly-once. Here we mainly about source exactly-once behavior.
-    FlinkSink.forRowData(streamFailingInTheMiddleOfReading)
-        .table(sinkTableResource.table())
-        .tableLoader(sinkTableResource.tableLoader())
-        .append();
+    createBoundedStreams(env, expectedRecords.size() / 2);
 
     JobClient jobClient = env.executeAsync("Bounded Iceberg Source Failover 
Test");
     JobID jobId = jobClient.getJobID();
@@ -222,6 +267,28 @@ public class TestIcebergSourceFailover {
     assertRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
   }
 
+  private void createBoundedStreams(StreamExecutionEnvironment env, int 
failAfter) {
+    env.setParallelism(PARALLELISM);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            sourceBuilder().build(),
+            WatermarkStrategy.noWatermarks(),
+            "IcebergSource",
+            TypeInformation.of(RowData.class));
+
+    DataStream<RowData> streamFailingInTheMiddleOfReading =
+        RecordCounterToFail.wrapWithFailureAfter(stream, failAfter);
+
+    // CollectStreamSink from DataStream#executeAndCollect() doesn't guarantee
+    // exactly-once behavior. When Iceberg sink, we can verify end-to-end
+    // exactly-once. Here we mainly about source exactly-once behavior.
+    FlinkSink.forRowData(streamFailingInTheMiddleOfReading)
+        .table(sinkTableResource.table())
+        .tableLoader(sinkTableResource.tableLoader())
+        .append();
+  }
+
   // ------------------------------------------------------------------------
   // test utilities copied from Flink's FileSourceTextLinesITCase
   // ------------------------------------------------------------------------
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
index f7dc931c50..27a8894ad4 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
@@ -88,16 +88,11 @@ public class 
TestIcebergSourceFailoverWithWatermarkExtractor extends TestIceberg
     Awaitility.await("expected list of records should be produced")
         .atMost(timeout)
         .untilAsserted(
-            () -> {
-              SimpleDataUtil.equalsRecords(
-                  expectedNormalized,
-                  
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
-                  table.schema());
-              SimpleDataUtil.assertRecordsEqual(
-                  expectedNormalized,
-                  
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
-                  table.schema());
-            });
+            () ->
+                SimpleDataUtil.assertRecordsEqual(
+                    expectedNormalized,
+                    
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
+                    table.schema()));
   }
 
   private List<Record> convertLocalDateTimeToMilli(List<Record> records) {

Reply via email to