[FLINK-3350] [tests] Increase default test Akka ask and ZooKeeper timeouts
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f40251 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f40251 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f40251 Branch: refs/heads/master Commit: b8f40251c6c45379118254c21b0d553c2d3b8937 Parents: 9173825 Author: Ufuk Celebi <u...@apache.org> Authored: Mon Feb 8 14:24:43 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 10 15:26:43 2016 +0100 ---------------------------------------------------------------------- .../runtime/minicluster/FlinkMiniCluster.scala | 20 ++++++++++++++++++-- .../minicluster/LocalFlinkMiniCluster.scala | 2 ++ .../runtime/testutils/ZooKeeperTestUtils.java | 5 +++-- .../runtime/testingUtils/TestingCluster.scala | 2 ++ .../kafka/KafkaTestEnvironmentImpl.java | 7 ++++--- .../kafka/KafkaTestEnvironmentImpl.java | 7 ++++--- ...ctTaskManagerProcessFailureRecoveryTest.java | 3 +++ .../JobManagerCheckpointRecoveryITCase.java | 8 ++++++-- .../recovery/ProcessFailureCancelingITCase.java | 2 +- 9 files changed, 43 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 4cdda3f..0346d6d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -42,7 +42,7 @@ import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor} import org.slf4j.LoggerFactory -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent._ import scala.concurrent.forkjoin.ForkJoinPool @@ -86,7 +86,7 @@ abstract class FlinkMiniCluster( implicit val executionContext = ExecutionContext.global - implicit val timeout = AkkaUtils.getTimeout(userConfiguration) + implicit val timeout = AkkaUtils.getTimeout(configuration) val recoveryMode = RecoveryMode.fromConfig(configuration) @@ -188,6 +188,22 @@ abstract class FlinkMiniCluster( AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort))) } + /** + * Sets CI environment (Travis) specific config defaults. + */ + def setDefaultCiConfig(config: Configuration) : Unit = { + // https://docs.travis-ci.com/user/environment-variables#Default-Environment-Variables + if (sys.env.contains("CI")) { + // Only set if nothing specified in config + if (config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, null) == null) { + val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) * 10 + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, s"${duration.toSeconds}s") + + LOG.info(s"Akka ask timeout set to ${duration.toSeconds}s") + } + } + } + // -------------------------------------------------------------------------- // Start/Stop Methods // -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 913aec0..c803429 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -48,6 +48,8 @@ class LocalFlinkMiniCluster( override def generateConfiguration(userConfiguration: Configuration): Configuration = { val config = getDefaultConfig + setDefaultCiConfig(config) + config.addAll(userConfiguration) setMemory(config) initializeIOFormatClasses(config) http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index 6c33835..75569ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -70,9 +70,9 @@ public class ZooKeeperTestUtils { config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum); int connTimeout = 5000; - if (System.getenv().get("CI") != null) { + if (System.getenv().containsKey("CI")) { // The regular timeout is to aggressive for Travis and connections are often lost. - connTimeout = 20000; + connTimeout = 30000; } config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout); @@ -87,6 +87,7 @@ public class ZooKeeperTestUtils { config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "10 s"); return config; http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 5eee4e5..bd56040 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -63,6 +63,8 @@ class TestingCluster( cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10) cfg.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1) + setDefaultCiConfig(cfg) + cfg.addAll(userConfig) cfg } http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 6f56ede..3d9e4f5 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -176,8 +176,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); standardProps.setProperty("auto.commit.enable", "false"); - standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis. - standardProps.setProperty("zookeeper.connection.timeout.ms", "20000"); + standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis. + standardProps.setProperty("zookeeper.connection.timeout.ms", "30000"); standardProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning. (smallest is kafka 0.8) standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) } @@ -292,7 +292,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); // for CI stability, increase zookeeper session timeout - kafkaProperties.put("zookeeper.session.timeout.ms", "20000"); + kafkaProperties.put("zookeeper.session.timeout.ms", "30000"); + kafkaProperties.put("zookeeper.connection.timeout.ms", "30000"); final int numTries = 5; http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 50dcab8..9dfd021 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -175,8 +175,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); standardProps.setProperty("auto.commit.enable", "false"); - standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis. - standardProps.setProperty("zookeeper.connection.timeout.ms", "20000"); + standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis. + standardProps.setProperty("zookeeper.connection.timeout.ms", "30000"); standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value) standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) } @@ -296,7 +296,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); // for CI stability, increase zookeeper session timeout - kafkaProperties.put("zookeeper.session.timeout.ms", "20000"); + kafkaProperties.put("zookeeper.session.timeout.ms", "30000"); + kafkaProperties.put("zookeeper.connection.timeout.ms", "30000"); final int numTries = 5; http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 5dd870f..7b4c9b2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.FlinkMiniCluster; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.util.NetUtils; @@ -122,6 +123,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); jmConfig.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "10 s"); + jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress)); ActorRef jmActor = JobManager.startJobManagerActors( @@ -376,6 +378,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + cfg.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, TaskManager.class); http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java index 737d39a..5d1b2c5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java @@ -156,6 +156,7 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger { Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper .getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString()); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism); + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); ActorSystem testSystem = null; JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; @@ -182,7 +183,8 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger { leaderRetrievalService.start(leaderListener); // The task manager - taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + taskManagerSystem = AkkaUtils.createActorSystem( + config, Option.apply(new Tuple2<String, Object>("localhost", 0))); TaskManager.startTaskManagerComponentsAndActor( config, taskManagerSystem, "localhost", Option.<String>empty(), Option.<LeaderRetrievalService>empty(), @@ -297,6 +299,7 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger { fileStateBackendPath); config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2); + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; LeaderRetrievalService leaderRetrievalService = null; @@ -321,7 +324,8 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger { leaderRetrievalService.start(leaderListener); // The task manager - taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + taskManagerSystem = AkkaUtils.createActorSystem( + config, Option.apply(new Tuple2<String, Object>("localhost", 0))); TaskManager.startTaskManagerComponentsAndActor( config, taskManagerSystem, "localhost", Option.<String>empty(), Option.<LeaderRetrievalService>empty(), http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 37e4e38..2b49c08 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -97,7 +97,7 @@ public class ProcessFailureCancelingITCase { jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "5 s"); jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2000 s"); jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 10); - jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10 s"); + jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress)); ActorRef jmActor = JobManager.startJobManagerActors(