Repository: flink Updated Branches: refs/heads/master 3b0321aee -> d29bed383
[FLINK-7268] Add delaying executor in *EventTimeWindowCheckpointingITCase This helps tease out races, for example the recently discovered one in cleanup of incremental state handles at the SharedStateRegistry. (cherry picked from commit d7683cc) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d29bed38 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d29bed38 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d29bed38 Branch: refs/heads/master Commit: d29bed38311f7a01d2241fbf8fa26eac7f012f53 Parents: 91a4b27 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri Jul 28 15:01:35 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Aug 15 14:56:54 2017 +0200 ---------------------------------------------------------------------- ...bstractEventTimeWindowCheckpointingITCase.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d29bed38/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index c525a37..4d5fa71 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -31,6 +31,8 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; @@ -62,6 +64,9 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; @@ -139,7 +144,18 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString()); } - cluster = new LocalFlinkMiniCluster(config, false); + // purposefully delay in the executor to tease out races + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); + HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( + config, + new Executor() { + @Override + public void execute(Runnable command) { + executor.schedule(command, 500, MILLISECONDS); + } + }); + + cluster = new LocalFlinkMiniCluster(config, haServices, false); cluster.start(); env = new TestStreamEnvironment(cluster, PARALLELISM);