Repository: flink Updated Branches: refs/heads/master 0b23b5ea1 -> 0d19e94a1
[FLINK-2616] [test-stability] Fixes ZooKeeperLeaderElectionTest.testMultipleLeaders by introducing a second retrieval service to retrieve the leader address after the faulty address has been written. This closes #1173. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d19e94a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d19e94a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d19e94a Branch: refs/heads/master Commit: 0d19e94a18a334b6b5843c3ea81e20af096d7f35 Parents: 0b23b5e Author: Till Rohrmann <[email protected]> Authored: Wed Sep 23 14:34:38 2015 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Sep 24 14:56:11 2015 +0200 ---------------------------------------------------------------------- .../StandaloneLeaderElectionTest.java | 2 +- .../runtime/leaderelection/TestingListener.java | 27 ---------------- .../ZooKeeperLeaderElectionTest.java | 33 +++++++++++--------- tools/log4j-travis.properties | 11 ++++--- 4 files changed, 25 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0d19e94a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java index 86401bc..b04be63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java @@ -48,7 +48,7 @@ public class StandaloneLeaderElectionTest extends TestLogger { assertTrue(contender.isLeader()); assertEquals(null, contender.getLeaderSessionID()); - testingListener.waitForLeader(1000l); + testingListener.waitForNewLeader(1000l); assertEquals(TEST_URL, testingListener.getAddress()); assertEquals(null, testingListener.getLeaderSessionID()); http://git-wip-us.apache.org/repos/asf/flink/blob/0d19e94a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java index 7b3d06f..54ee822 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java @@ -47,33 +47,6 @@ public class TestingListener implements LeaderRetrievalListener { return leaderSessionID; } - public void clear() { - address = null; - leaderSessionID = null; - } - - public void waitForLeader(long timeout) throws Exception { - long start = System.currentTimeMillis(); - long curTimeout; - - while (exception == null && address == null && (curTimeout = timeout - System.currentTimeMillis() + start) > 0) { - synchronized (lock) { - try { - lock.wait(curTimeout); - } catch (InterruptedException e) { - // we got interrupted so check again for the condition - } - } - } - - if (exception != null) { - throw exception; - } else if (address == null) { - throw new TimeoutException("Listener was not notified about a leader within " + - timeout + "ms"); - } - } - public void waitForNewLeader(long timeout) throws Exception { long start = System.currentTimeMillis(); long curTimeout; http://git-wip-us.apache.org/repos/asf/flink/blob/0d19e94a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index 7c7867a..34a582f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -105,7 +105,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { assertTrue(contender.isLeader()); assertEquals(leaderElectionService.getLeaderSessionID(), contender.getLeaderSessionID()); - listener.waitForLeader(timeout.toMillis()); + listener.waitForNewLeader(timeout.toMillis()); assertEquals(TEST_URL, listener.getAddress()); assertEquals(leaderElectionService.getLeaderSessionID(), listener.getLeaderSessionID()); @@ -226,7 +226,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { Pattern regex = Pattern.compile(pattern); for (int i = 0; i < numTries; i++) { - listener.waitForLeader(timeout.toMillis()); + listener.waitForNewLeader(timeout.toMillis()); String address = listener.getAddress(); @@ -238,9 +238,6 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { assertEquals(listener.getLeaderSessionID(), contenders[index].getLeaderSessionID()); - // clear the current leader of the listener - listener.clear(); - // stop leader election service = revoke leadership leaderElectionService[index].stop(); // create new leader election service which takes part in the leader election @@ -285,25 +282,26 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { ZooKeeperLeaderElectionService leaderElectionService = null; ZooKeeperLeaderRetrievalService leaderRetrievalService = null; + ZooKeeperLeaderRetrievalService leaderRetrievalService2 = null; TestingListener listener = new TestingListener(); + TestingListener listener2 = new TestingListener(); TestingContender contender; try { leaderElectionService = ZooKeeperUtils.createLeaderElectionService(configuration); leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration); + leaderRetrievalService2 = ZooKeeperUtils.createLeaderRetrievalService(configuration); contender = new TestingContender(TEST_URL, leaderElectionService); leaderElectionService.start(contender); leaderRetrievalService.start(listener); - listener.waitForLeader(timeout.toMillis()); + listener.waitForNewLeader(timeout.toMillis()); assertEquals(listener.getLeaderSessionID(), contender.getLeaderSessionID()); assertEquals(TEST_URL, listener.getAddress()); - listener.clear(); - CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration); ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -329,15 +327,16 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { } } - listener.waitForLeader(timeout.toMillis()); + leaderRetrievalService2.start(listener2); + + listener2.waitForNewLeader(timeout.toMillis()); - if (FAULTY_CONTENDER_URL.equals(listener.getAddress())) { - listener.clear(); - listener.waitForLeader(timeout.toMillis()); + if (FAULTY_CONTENDER_URL.equals(listener2.getAddress())) { + listener2.waitForNewLeader(timeout.toMillis()); } - assertEquals(listener.getLeaderSessionID(), contender.getLeaderSessionID()); - assertEquals(listener.getAddress(), contender.getAddress()); + assertEquals(listener2.getLeaderSessionID(), contender.getLeaderSessionID()); + assertEquals(listener2.getAddress(), contender.getAddress()); } finally { if (leaderElectionService != null) { @@ -347,6 +346,10 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { if (leaderRetrievalService != null) { leaderRetrievalService.stop(); } + + if (leaderRetrievalService2 != null) { + leaderRetrievalService2.stop(); + } } } @@ -474,7 +477,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { leaderRetrievalService.start(listener); try { - listener.waitForLeader(1000); + listener.waitForNewLeader(1000); fail("TimeoutException was expected because there is no leader registered and " + "thus there shouldn't be any leader information in ZooKeeper."); http://git-wip-us.apache.org/repos/asf/flink/blob/0d19e94a/tools/log4j-travis.properties ---------------------------------------------------------------------- diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties index 1cdd152..53379b4 100644 --- a/tools/log4j-travis.properties +++ b/tools/log4j-travis.properties @@ -35,11 +35,12 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file -log4j.logger.org.apache.zookeeper=ERROR, file -log4j.logger.org.apache.zookeeper.server.quorum.QuorumCnxManager=OFF, file -log4j.logger.org.apache.flink.runtime.leaderelection=DEBUG,file -log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG,file +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.apache.zookeeper.server.quorum.QuorumCnxManager=OFF +log4j.logger.org.apache.flink.runtime.leaderelection=DEBUG +log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG + # Log a bit when running the flink-yarn-tests to avoid running into the 5 minutes timeout for # the tests log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console
