This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f858348fc18cd532b9de1f53b055f4983373b46c Author: Stephan Ewen <se...@apache.org> AuthorDate: Wed Apr 14 17:34:16 2021 +0200 [hotfix][tests] Minor debuggability improvements to CoordinatedSourceRescaleITCase --- .../base/source/reader/CoordinatedSourceRescaleITCase.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java index 7ca15a3..f9e0e27 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.TestLogger; import org.junit.Rule; import org.junit.Test; @@ -49,9 +50,11 @@ import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStor import static org.junit.Assert.assertThat; /** Tests if the coordinator handles up and downscaling. */ -public class CoordinatedSourceRescaleITCase { +public class CoordinatedSourceRescaleITCase extends TestLogger { + public static final String CREATED_CHECKPOINT = "successfully created checkpoint"; public static final String RESTORED_CHECKPOINT = "successfully restored checkpoint"; + @Rule public final TemporaryFolder temp = new TemporaryFolder(); @Test @@ -128,7 +131,8 @@ public class CoordinatedSourceRescaleITCase { private static class FailingMapFunction extends RichMapFunction<Long, Long> implements CheckpointListener { private static final long serialVersionUID = 699621912578369378L; - private boolean generateCheckpoint; + private final boolean generateCheckpoint; + private boolean processedRecord; FailingMapFunction(boolean generateCheckpoint) { this.generateCheckpoint = generateCheckpoint; @@ -136,6 +140,7 @@ public class CoordinatedSourceRescaleITCase { @Override public Long map(Long value) throws Exception { + processedRecord = true; // run a bit before failing if (!generateCheckpoint && value % 100 == 42) { throw new Exception(RESTORED_CHECKPOINT); @@ -145,7 +150,7 @@ public class CoordinatedSourceRescaleITCase { @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - if (generateCheckpoint && checkpointId > 5) { + if (generateCheckpoint && processedRecord && checkpointId > 5) { throw new Exception(CREATED_CHECKPOINT); } }