This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f858348fc18cd532b9de1f53b055f4983373b46c
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Apr 14 17:34:16 2021 +0200

    [hotfix][tests] Minor debuggability improvements to 
CoordinatedSourceRescaleITCase
---
 .../base/source/reader/CoordinatedSourceRescaleITCase.java    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
index 7ca15a3..f9e0e27 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -49,9 +50,11 @@ import static 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStor
 import static org.junit.Assert.assertThat;
 
 /** Tests if the coordinator handles up and downscaling. */
-public class CoordinatedSourceRescaleITCase {
+public class CoordinatedSourceRescaleITCase extends TestLogger {
+
     public static final String CREATED_CHECKPOINT = "successfully created 
checkpoint";
     public static final String RESTORED_CHECKPOINT = "successfully restored 
checkpoint";
+
     @Rule public final TemporaryFolder temp = new TemporaryFolder();
 
     @Test
@@ -128,7 +131,8 @@ public class CoordinatedSourceRescaleITCase {
     private static class FailingMapFunction extends RichMapFunction<Long, Long>
             implements CheckpointListener {
         private static final long serialVersionUID = 699621912578369378L;
-        private boolean generateCheckpoint;
+        private final boolean generateCheckpoint;
+        private boolean processedRecord;
 
         FailingMapFunction(boolean generateCheckpoint) {
             this.generateCheckpoint = generateCheckpoint;
@@ -136,6 +140,7 @@ public class CoordinatedSourceRescaleITCase {
 
         @Override
         public Long map(Long value) throws Exception {
+            processedRecord = true;
             // run a bit before failing
             if (!generateCheckpoint && value % 100 == 42) {
                 throw new Exception(RESTORED_CHECKPOINT);
@@ -145,7 +150,7 @@ public class CoordinatedSourceRescaleITCase {
 
         @Override
         public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-            if (generateCheckpoint && checkpointId > 5) {
+            if (generateCheckpoint && processedRecord && checkpointId > 5) {
                 throw new Exception(CREATED_CHECKPOINT);
             }
         }

Reply via email to