This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b7d3a7f9d3 Flink: Fix bounded source state restore record duplication
(#10208)
b7d3a7f9d3 is described below
commit b7d3a7f9d33aae2a85f9a742ed1ab87608283d28
Author: pvary <[email protected]>
AuthorDate: Fri Apr 26 12:30:16 2024 +0200
Flink: Fix bounded source state restore record duplication (#10208)
---
.../apache/iceberg/flink/source/IcebergSource.java | 8 +-
.../org/apache/iceberg/flink/SimpleDataUtil.java | 6 +-
.../flink/source/TestIcebergSourceFailover.java | 107 +++++++++++++++++----
...cebergSourceFailoverWithWatermarkExtractor.java | 15 +--
4 files changed, 99 insertions(+), 37 deletions(-)
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 0655cf87a9..8ec92c8c2c 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -201,8 +201,12 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
return new ContinuousIcebergEnumerator(
enumContext, assigner, scanContext, splitPlanner, enumState);
} else {
- List<IcebergSourceSplit> splits =
planSplitsForBatch(planningThreadName());
- assigner.onDiscoveredSplits(splits);
+ if (enumState == null) {
+ // Only do scan planning if nothing is restored from checkpoint state
+ List<IcebergSourceSplit> splits =
planSplitsForBatch(planningThreadName());
+ assigner.onDiscoveredSplits(splits);
+ }
+
return new StaticIcebergEnumerator(enumContext, assigner);
}
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index f48764f772..ce6caca121 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -284,11 +284,7 @@ public class SimpleDataUtil {
public static void assertTableRecords(Table table, List<Record> expected,
Duration timeout) {
Awaitility.await("expected list of records should be produced")
.atMost(timeout)
- .untilAsserted(
- () -> {
- equalsRecords(expected, tableRecords(table), table.schema());
- assertRecordsEqual(expected, tableRecords(table),
table.schema());
- });
+ .untilAsserted(() -> assertRecordsEqual(expected, tableRecords(table),
table.schema()));
}
public static void assertTableRecords(Table table, List<Record> expected)
throws IOException {
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index 7d991ee603..7047a62a2c 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -18,6 +18,9 @@
*/
package org.apache.iceberg.flink.source;
+import static org.apache.iceberg.flink.SimpleDataUtil.tableRecords;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -29,7 +32,9 @@ import
org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
import
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -40,10 +45,12 @@ import
org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
@@ -57,7 +64,10 @@ import org.junit.rules.TemporaryFolder;
public class TestIcebergSourceFailover {
- private static final int PARALLELISM = 4;
+ // Parallelism higher than 1, but lower than the number of splits used by
some of our tests
+ // The goal is to allow some splits to remain in the enumerator when
restoring the state
+ private static final int PARALLELISM = 2;
+ private static final int DO_NOT_FAIL = Integer.MAX_VALUE;
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
@@ -87,6 +97,10 @@ public class TestIcebergSourceFailover {
return IcebergSource.forRowData()
.tableLoader(sourceTableResource.tableLoader())
.assignerFactory(new SimpleSplitAssignerFactory())
+ // Prevent combining splits
+ .set(
+ FlinkReadOptions.SPLIT_FILE_OPEN_COST,
+ Long.toString(TableProperties.SPLIT_SIZE_DEFAULT))
.flinkConfig(config);
}
@@ -103,6 +117,55 @@ public class TestIcebergSourceFailover {
SimpleDataUtil.assertTableRecords(table, expectedRecords, timeout);
}
+ @Test
+ public void testBoundedWithSavepoint() throws Exception {
+ List<Record> expectedRecords = Lists.newArrayList();
+ Table sinkTable = sinkTableResource.table();
+ GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(
+ sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+ for (int i = 0; i < 4; ++i) {
+ List<Record> records = generateRecords(2, i);
+ expectedRecords.addAll(records);
+ dataAppender.appendToTable(records);
+ }
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ createBoundedStreams(env, 2);
+
+ JobClient jobClient = env.executeAsync("Bounded Iceberg Source Savepoint
Test");
+ JobID jobId = jobClient.getJobID();
+
+ // Write something, but do not finish before checkpoint is created
+ RecordCounterToFail.waitToFail();
+ CompletableFuture<String> savepoint =
+ miniClusterResource
+ .getClusterClient()
+ .stopWithSavepoint(
+ jobId,
+ false,
+ TEMPORARY_FOLDER.newFolder().toPath().toString(),
+ SavepointFormatType.CANONICAL);
+ RecordCounterToFail.continueProcessing();
+
+ // Wait for the job to stop with the savepoint
+ String savepointPath = savepoint.get();
+
+ // We expect that at least a few records has written
+ assertThat(tableRecords(sinkTable)).hasSizeGreaterThan(0);
+
+ // New env from the savepoint
+ Configuration conf = new Configuration();
+ conf.set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
+ env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
+ createBoundedStreams(env, DO_NOT_FAIL);
+
+ env.execute("Bounded Iceberg Source Savepoint Test");
+
+ // We expect no duplications
+ assertRecords(sinkTable, expectedRecords, Duration.ofSeconds(120));
+ }
+
@Test
public void testBoundedWithTaskManagerFailover() throws Exception {
testBoundedIcebergSource(FailoverType.TM);
@@ -125,26 +188,8 @@ public class TestIcebergSourceFailover {
}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
-
- DataStream<RowData> stream =
- env.fromSource(
- sourceBuilder().build(),
- WatermarkStrategy.noWatermarks(),
- "IcebergSource",
- TypeInformation.of(RowData.class));
-
- DataStream<RowData> streamFailingInTheMiddleOfReading =
- RecordCounterToFail.wrapWithFailureAfter(stream,
expectedRecords.size() / 2);
-
- // CollectStreamSink from DataStream#executeAndCollect() doesn't guarantee
- // exactly-once behavior. When Iceberg sink, we can verify end-to-end
- // exactly-once. Here we mainly about source exactly-once behavior.
- FlinkSink.forRowData(streamFailingInTheMiddleOfReading)
- .table(sinkTableResource.table())
- .tableLoader(sinkTableResource.tableLoader())
- .append();
+ createBoundedStreams(env, expectedRecords.size() / 2);
JobClient jobClient = env.executeAsync("Bounded Iceberg Source Failover
Test");
JobID jobId = jobClient.getJobID();
@@ -222,6 +267,28 @@ public class TestIcebergSourceFailover {
assertRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
}
+ private void createBoundedStreams(StreamExecutionEnvironment env, int
failAfter) {
+ env.setParallelism(PARALLELISM);
+
+ DataStream<RowData> stream =
+ env.fromSource(
+ sourceBuilder().build(),
+ WatermarkStrategy.noWatermarks(),
+ "IcebergSource",
+ TypeInformation.of(RowData.class));
+
+ DataStream<RowData> streamFailingInTheMiddleOfReading =
+ RecordCounterToFail.wrapWithFailureAfter(stream, failAfter);
+
+ // CollectStreamSink from DataStream#executeAndCollect() doesn't guarantee
+ // exactly-once behavior. When Iceberg sink, we can verify end-to-end
+ // exactly-once. Here we mainly about source exactly-once behavior.
+ FlinkSink.forRowData(streamFailingInTheMiddleOfReading)
+ .table(sinkTableResource.table())
+ .tableLoader(sinkTableResource.tableLoader())
+ .append();
+ }
+
// ------------------------------------------------------------------------
// test utilities copied from Flink's FileSourceTextLinesITCase
// ------------------------------------------------------------------------
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
index f7dc931c50..27a8894ad4 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
@@ -88,16 +88,11 @@ public class
TestIcebergSourceFailoverWithWatermarkExtractor extends TestIceberg
Awaitility.await("expected list of records should be produced")
.atMost(timeout)
.untilAsserted(
- () -> {
- SimpleDataUtil.equalsRecords(
- expectedNormalized,
-
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
- table.schema());
- SimpleDataUtil.assertRecordsEqual(
- expectedNormalized,
-
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
- table.schema());
- });
+ () ->
+ SimpleDataUtil.assertRecordsEqual(
+ expectedNormalized,
+
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
+ table.schema()));
}
private List<Record> convertLocalDateTimeToMilli(List<Record> records) {