Repository: flink
Updated Branches:
  refs/heads/master 3b0321aee -> d29bed383


[FLINK-7268] Add delaying executor in *EventTimeWindowCheckpointingITCase

This helps tease out races, for example the recently discovered one in
cleanup of incremental state handles at the SharedStateRegistry.

(cherry picked from commit d7683cc)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d29bed38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d29bed38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d29bed38

Branch: refs/heads/master
Commit: d29bed38311f7a01d2241fbf8fa26eac7f012f53
Parents: 91a4b27
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri Jul 28 15:01:35 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Aug 15 14:56:54 2017 +0200

----------------------------------------------------------------------
 ...bstractEventTimeWindowCheckpointingITCase.java | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d29bed38/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index c525a37..4d5fa71 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -31,6 +31,8 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -62,6 +64,9 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
@@ -139,7 +144,18 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                        
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haDir.toURI().toString());
                }
 
-               cluster = new LocalFlinkMiniCluster(config, false);
+               // purposefully delay in the executor to tease out races
+               final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(10);
+               HighAvailabilityServices haServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+                       config,
+                       new Executor() {
+                               @Override
+                               public void execute(Runnable command) {
+                                       executor.schedule(command, 500, 
MILLISECONDS);
+                               }
+                       });
+
+               cluster = new LocalFlinkMiniCluster(config, haServices, false);
                cluster.start();
 
                env = new TestStreamEnvironment(cluster, PARALLELISM);

Reply via email to