[FLINK-3350] [tests] Increase default test Akka ask and ZooKeeper timeouts

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f40251
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f40251
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f40251

Branch: refs/heads/master
Commit: b8f40251c6c45379118254c21b0d553c2d3b8937
Parents: 9173825
Author: Ufuk Celebi <u...@apache.org>
Authored: Mon Feb 8 14:24:43 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 10 15:26:43 2016 +0100

----------------------------------------------------------------------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 20 ++++++++++++++++++--
 .../minicluster/LocalFlinkMiniCluster.scala     |  2 ++
 .../runtime/testutils/ZooKeeperTestUtils.java   |  5 +++--
 .../runtime/testingUtils/TestingCluster.scala   |  2 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |  7 ++++---
 .../kafka/KafkaTestEnvironmentImpl.java         |  7 ++++---
 ...ctTaskManagerProcessFailureRecoveryTest.java |  3 +++
 .../JobManagerCheckpointRecoveryITCase.java     |  8 ++++++--
 .../recovery/ProcessFailureCancelingITCase.java |  2 +-
 9 files changed, 43 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 4cdda3f..0346d6d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, 
WebMonitor}
 
 import org.slf4j.LoggerFactory
 
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.concurrent._
 import scala.concurrent.forkjoin.ForkJoinPool
 
@@ -86,7 +86,7 @@ abstract class FlinkMiniCluster(
   
   implicit val executionContext = ExecutionContext.global
 
-  implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
+  implicit val timeout = AkkaUtils.getTimeout(configuration)
 
   val recoveryMode = RecoveryMode.fromConfig(configuration)
 
@@ -188,6 +188,22 @@ abstract class FlinkMiniCluster(
     AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
   }
 
+  /**
+    * Sets CI environment (Travis) specific config defaults.
+    */
+  def setDefaultCiConfig(config: Configuration) : Unit = {
+    // 
https://docs.travis-ci.com/user/environment-variables#Default-Environment-Variables
+    if (sys.env.contains("CI")) {
+      // Only set if nothing specified in config
+      if (config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, null) == null) {
+        val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) * 10
+        config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
s"${duration.toSeconds}s")
+
+        LOG.info(s"Akka ask timeout set to ${duration.toSeconds}s")
+      }
+    }
+  }
+
   // --------------------------------------------------------------------------
   //                          Start/Stop Methods
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 913aec0..c803429 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -48,6 +48,8 @@ class LocalFlinkMiniCluster(
   override def generateConfiguration(userConfiguration: Configuration): 
Configuration = {
     val config = getDefaultConfig
 
+    setDefaultCiConfig(config)
+
     config.addAll(userConfiguration)
     setMemory(config)
     initializeIOFormatClasses(config)

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 6c33835..75569ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -70,9 +70,9 @@ public class ZooKeeperTestUtils {
                config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
zooKeeperQuorum);
 
                int connTimeout = 5000;
-               if (System.getenv().get("CI") != null) {
+               if (System.getenv().containsKey("CI")) {
                        // The regular timeout is to aggressive for Travis and 
connections are often lost.
-                       connTimeout = 20000;
+                       connTimeout = 30000;
                }
 
                config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, 
connTimeout);
@@ -87,6 +87,7 @@ public class ZooKeeperTestUtils {
                config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"1000 ms");
                config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 
s");
                config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
                config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "10 
s");
 
                return config;

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 5eee4e5..bd56040 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -63,6 +63,8 @@ class TestingCluster(
     cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
     cfg.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
 
+    setDefaultCiConfig(cfg)
+
     cfg.addAll(userConfig)
     cfg
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 6f56ede..3d9e4f5 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -176,8 +176,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
                standardProps.setProperty("group.id", "flink-tests");
                standardProps.setProperty("auto.commit.enable", "false");
-               standardProps.setProperty("zookeeper.session.timeout.ms", 
"12000"); // 6 seconds is default. Seems to be too small for travis.
-               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"20000");
+               standardProps.setProperty("zookeeper.session.timeout.ms", 
"30000"); // 6 seconds is default. Seems to be too small for travis.
+               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"30000");
                standardProps.setProperty("auto.offset.reset", "smallest"); // 
read from the beginning. (smallest is kafka 0.8)
                standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
        }
@@ -292,7 +292,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
 
                // for CI stability, increase zookeeper session timeout
-               kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
+               kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
+               kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
 
                final int numTries = 5;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 50dcab8..9dfd021 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -175,8 +175,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
                standardProps.setProperty("group.id", "flink-tests");
                standardProps.setProperty("auto.commit.enable", "false");
-               standardProps.setProperty("zookeeper.session.timeout.ms", 
"12000"); // 6 seconds is default. Seems to be too small for travis.
-               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"20000");
+               standardProps.setProperty("zookeeper.session.timeout.ms", 
"30000"); // 6 seconds is default. Seems to be too small for travis.
+               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"30000");
                standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.9 value)
                standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
        }
@@ -296,7 +296,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
 
                // for CI stability, increase zookeeper session timeout
-               kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
+               kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
+               kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
 
                final int numTries = 5;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 5dd870f..7b4c9b2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
@@ -122,6 +123,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
                        
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
                        
jmConfig.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "10 s");
+                       jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
"100 s");
 
                        jmActorSystem = AkkaUtils.createActorSystem(jmConfig, 
new Some<Tuple2<String, Object>>(localAddress));
                        ActorRef jmActor = JobManager.startJobManagerActors(
@@ -376,6 +378,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                                
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
                                
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
                                
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+                               cfg.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
"100 s");
 
                                
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, TaskManager.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
index 737d39a..5d1b2c5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
@@ -156,6 +156,7 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
                Configuration config = 
ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper
                                .getConnectString(), 
FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
Parallelism);
+               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
                ActorSystem testSystem = null;
                JobManagerProcess[] jobManagerProcess = new 
JobManagerProcess[2];
@@ -182,7 +183,8 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
                        leaderRetrievalService.start(leaderListener);
 
                        // The task manager
-                       taskManagerSystem = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+                       taskManagerSystem = AkkaUtils.createActorSystem(
+                                       config, Option.apply(new Tuple2<String, 
Object>("localhost", 0)));
                        TaskManager.startTaskManagerComponentsAndActor(
                                        config, taskManagerSystem, "localhost",
                                        Option.<String>empty(), 
Option.<LeaderRetrievalService>empty(),
@@ -297,6 +299,7 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
                                fileStateBackendPath);
 
                config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
+               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
                JobManagerProcess[] jobManagerProcess = new 
JobManagerProcess[2];
                LeaderRetrievalService leaderRetrievalService = null;
@@ -321,7 +324,8 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
                        leaderRetrievalService.start(leaderListener);
 
                        // The task manager
-                       taskManagerSystem = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+                       taskManagerSystem = AkkaUtils.createActorSystem(
+                                       config, Option.apply(new Tuple2<String, 
Object>("localhost", 0)));
                        TaskManager.startTaskManagerComponentsAndActor(
                                        config, taskManagerSystem, "localhost",
                                        Option.<String>empty(), 
Option.<LeaderRetrievalService>empty(),

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 37e4e38..2b49c08 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -97,7 +97,7 @@ public class ProcessFailureCancelingITCase {
                        
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "5 s");
                        
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2000 s");
                        
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 10);
-                       jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
"10 s");
+                       jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
"100 s");
 
                        jmActorSystem = AkkaUtils.createActorSystem(jmConfig, 
new Some<Tuple2<String, Object>>(localAddress));
                        ActorRef jmActor = JobManager.startJobManagerActors(

Reply via email to