This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 24baf7f39a8471f3544f670699a334d188a42ee9
Author: Anton Kalashnikov <kaa....@yandex.ru>
AuthorDate: Wed Sep 1 17:35:35 2021 +0200

    [FLINK-24069][tests] Fail IgnoreInFlightData test when there is not enough 
data for it for all checkpoints not only the first one
---
 .../flink/test/checkpointing/IgnoreInFlightDataITCase.java    | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
index b8e4ba3..87c9141 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
@@ -258,6 +258,17 @@ public class IgnoreInFlightDataITCase extends TestLogger {
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+            Iterator<Integer> integerIterator = valueState.get().iterator();
+
+            if (!integerIterator.hasNext()
+                    || integerIterator.next() < PARALLELISM
+                    || (context.getCheckpointId() > 1
+                            && lastCheckpointValue.get().get() < PARALLELISM)) 
{
+                // Try to restart task.
+                throw new RuntimeException(
+                        "Not enough data to guarantee the in-flight data were 
generated before the first checkpoint");
+            }
+
             if (context.getCheckpointId() > 2) {
                 // It is possible if checkpoint was triggered too fast after 
restart.
                 return; // Just ignore it.

Reply via email to