Author: todd Date: Fri Mar 30 20:23:59 2012 New Revision: 1307596 URL: http://svn.apache.org/viewvc?rev=1307596&view=rev Log: HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly. Contributed by Todd Lipcon.
Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt?rev=1307596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt (added) +++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt Fri Mar 30 20:23:59 2012 @@ -0,0 +1,8 @@ +Changes for HDFS-3042 branch. + +This change list will be merged into the trunk CHANGES.txt when the HDFS-3-42 +branch is merged. +------------------------------ + +HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly (todd) + Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1307596&r1=1307595&r2=1307596&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Fri Mar 30 20:23:59 2012 @@ -81,9 +81,15 @@ public class ActiveStandbyElector implem */ public interface ActiveStandbyElectorCallback { /** - * This method is called when the app becomes the active leader + * This method is called when the app becomes the active leader. + * If the service fails to become active, it should throw + * ServiceFailedException. This will cause the elector to + * sleep for a short period, then re-join the election. + * + * Callback implementations are expected to manage their own + * timeouts (e.g. when making an RPC to a remote node). */ - void becomeActive(); + void becomeActive() throws ServiceFailedException; /** * This method is called when the app becomes a standby @@ -135,6 +141,7 @@ public class ActiveStandbyElector implem public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class); private static final int NUM_RETRIES = 3; + private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000; private static enum ConnectionState { DISCONNECTED, CONNECTED, TERMINATED @@ -385,8 +392,11 @@ public class ActiveStandbyElector implem Code code = Code.get(rc); if (isSuccess(code)) { // we successfully created the znode. we are the leader. start monitoring - becomeActive(); - monitorActiveStatus(); + if (becomeActive()) { + monitorActiveStatus(); + } else { + reJoinElectionAfterFailureToBecomeActive(); + } return; } @@ -442,7 +452,9 @@ public class ActiveStandbyElector implem // creation was retried if (stat.getEphemeralOwner() == zkClient.getSessionId()) { // we own the lock znode. so we are the leader - becomeActive(); + if (!becomeActive()) { + reJoinElectionAfterFailureToBecomeActive(); + } } else { // we dont own the lock znode. so we are a standby. becomeStandby(); @@ -480,6 +492,17 @@ public class ActiveStandbyElector implem } /** + * We failed to become active. Re-join the election, but + * sleep for a few seconds after terminating our existing + * session, so that other nodes have a chance to become active. + * The failure to become active is already logged inside + * becomeActive(). + */ + private void reJoinElectionAfterFailureToBecomeActive() { + reJoinElection(SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE); + } + + /** * interface implementation of Zookeeper watch events (connection and node), * proxied by {@link WatcherWithClientRef}. */ @@ -516,7 +539,7 @@ public class ActiveStandbyElector implem // call listener to reconnect LOG.info("Session expired. Entering neutral mode and rejoining..."); enterNeutralMode(); - reJoinElection(); + reJoinElection(0); break; default: fatalError("Unexpected Zookeeper watch event state: " @@ -591,7 +614,7 @@ public class ActiveStandbyElector implem createLockNodeAsync(); } - private void reJoinElection() { + private void reJoinElection(int sleepTime) { LOG.info("Trying to re-establish ZK session"); // Some of the test cases rely on expiring the ZK sessions and @@ -604,12 +627,30 @@ public class ActiveStandbyElector implem sessionReestablishLockForTests.lock(); try { terminateConnection(); + sleepFor(sleepTime); + joinElectionInternal(); } finally { sessionReestablishLockForTests.unlock(); } } - + + /** + * Sleep for the given number of milliseconds. + * This is non-static, and separated out, so that unit tests + * can override the behavior not to sleep. + */ + @VisibleForTesting + protected void sleepFor(int sleepMs) { + if (sleepMs > 0) { + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + @VisibleForTesting void preventSessionReestablishmentForTests() { sessionReestablishLockForTests.lock(); @@ -640,11 +681,7 @@ public class ActiveStandbyElector implem success = true; } catch(IOException e) { LOG.warn(e); - try { - Thread.sleep(5000); - } catch(InterruptedException e1) { - LOG.warn(e1); - } + sleepFor(5000); } ++connectionRetryCount; } @@ -675,20 +712,24 @@ public class ActiveStandbyElector implem terminateConnection(); } - private void becomeActive() { + private boolean becomeActive() { assert wantToBeInElection; - if (state != State.ACTIVE) { - try { - Stat oldBreadcrumbStat = fenceOldActive(); - writeBreadCrumbNode(oldBreadcrumbStat); - } catch (Exception e) { - LOG.warn("Exception handling the winning of election", e); - reJoinElection(); - return; - } + if (state == State.ACTIVE) { + // already active + return true; + } + try { + Stat oldBreadcrumbStat = fenceOldActive(); + writeBreadCrumbNode(oldBreadcrumbStat); + LOG.debug("Becoming active"); - state = State.ACTIVE; appClient.becomeActive(); + state = State.ACTIVE; + return true; + } catch (Exception e) { + LOG.warn("Exception handling the winning of election", e); + // Caller will handle quitting and rejoining the election. + return false; } } Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1307596&r1=1307595&r2=1307596&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Fri Mar 30 20:23:59 2012 @@ -242,15 +242,20 @@ public abstract class ZKFailoverControll notifyAll(); } - private synchronized void becomeActive() { + private synchronized void becomeActive() throws ServiceFailedException { LOG.info("Trying to make " + localTarget + " active..."); try { - localTarget.getProxy().transitionToActive(); + HAServiceProtocolHelper.transitionToActive(localTarget.getProxy()); LOG.info("Successfully transitioned " + localTarget + " to active state"); } catch (Throwable t) { LOG.fatal("Couldn't make " + localTarget + " active", t); - elector.quitElection(true); + if (t instanceof ServiceFailedException) { + throw (ServiceFailedException)t; + } else { + throw new ServiceFailedException("Couldn't transition to active", + t); + } /* * TODO: * we need to make sure that if we get fenced and then quickly restarted, @@ -297,7 +302,7 @@ public abstract class ZKFailoverControll */ class ElectorCallbacks implements ActiveStandbyElectorCallback { @Override - public void becomeActive() { + public void becomeActive() throws ServiceFailedException { ZKFailoverController.this.becomeActive(); } Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java?rev=1307596&r1=1307595&r2=1307596&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java Fri Mar 30 20:23:59 2012 @@ -19,16 +19,25 @@ package org.apache.hadoop.ha; import java.util.Arrays; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.ZooKeeperServer; public abstract class ActiveStandbyElectorTestUtil { + + private static final Log LOG = LogFactory.getLog( + ActiveStandbyElectorTestUtil.class); + private static final long LOG_INTERVAL_MS = 500; public static void waitForActiveLockData(TestContext ctx, ZooKeeperServer zks, String parentDir, byte[] activeData) throws Exception { + long st = System.currentTimeMillis(); + long lastPrint = st; while (true) { if (ctx != null) { ctx.checkException(); @@ -42,10 +51,18 @@ public abstract class ActiveStandbyElect Arrays.equals(activeData, data)) { return; } + if (System.currentTimeMillis() > lastPrint + LOG_INTERVAL_MS) { + LOG.info("Cur data: " + StringUtils.byteToHexString(data)); + lastPrint = System.currentTimeMillis(); + } } catch (NoNodeException nne) { if (activeData == null) { return; } + if (System.currentTimeMillis() > lastPrint + LOG_INTERVAL_MS) { + LOG.info("Cur data: no node"); + lastPrint = System.currentTimeMillis(); + } } Thread.sleep(50); } Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java?rev=1307596&r1=1307595&r2=1307596&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java Fri Mar 30 20:23:59 2012 @@ -51,6 +51,8 @@ public class TestActiveStandbyElector { private ActiveStandbyElectorTester elector; class ActiveStandbyElectorTester extends ActiveStandbyElector { + private int sleptFor = 0; + ActiveStandbyElectorTester(String hostPort, int timeout, String parent, List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException { super(hostPort, timeout, parent, acl, app); @@ -61,6 +63,14 @@ public class TestActiveStandbyElector { ++count; return mockZK; } + + @Override + protected void sleepFor(int ms) { + // don't sleep in unit tests! Instead, just record the amount of + // time slept + LOG.info("Would have slept for " + ms + "ms"); + sleptFor += ms; + } } private static final String ZK_PARENT_NAME = "/parent/node"; @@ -147,6 +157,68 @@ public class TestActiveStandbyElector { } /** + * Verify that, when the callback fails to enter active state, + * the elector rejoins the election after sleeping for a short period. + */ + @Test + public void testFailToBecomeActive() throws Exception { + mockNoPriorActive(); + elector.joinElection(data); + Assert.assertEquals(0, elector.sleptFor); + + Mockito.doThrow(new ServiceFailedException("failed to become active")) + .when(mockApp).becomeActive(); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, + ZK_LOCK_NAME); + // Should have tried to become active + Mockito.verify(mockApp).becomeActive(); + + // should re-join + Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); + Assert.assertEquals(2, count); + Assert.assertTrue(elector.sleptFor > 0); + } + + /** + * Verify that, when the callback fails to enter active state, after + * a ZK disconnect (i.e from the StatCallback), that the elector rejoins + * the election after sleeping for a short period. + */ + @Test + public void testFailToBecomeActiveAfterZKDisconnect() throws Exception { + mockNoPriorActive(); + elector.joinElection(data); + Assert.assertEquals(0, elector.sleptFor); + + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, + ZK_LOCK_NAME); + Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); + + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, + ZK_LOCK_NAME); + verifyExistCall(1); + + Stat stat = new Stat(); + stat.setEphemeralOwner(1L); + Mockito.when(mockZK.getSessionId()).thenReturn(1L); + + // Fake failure to become active from within the stat callback + Mockito.doThrow(new ServiceFailedException("fail to become active")) + .when(mockApp).becomeActive(); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat); + Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); + + // should re-join + Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); + Assert.assertEquals(2, count); + Assert.assertTrue(elector.sleptFor > 0); + } + + + /** * Verify that, if there is a record of a prior active node, the * elector asks the application to fence it before becoming active. */ Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java?rev=1307596&r1=1307595&r2=1307596&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java Fri Mar 30 20:23:59 2012 @@ -273,7 +273,8 @@ public class TestZKFailoverController ex waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY); waitForActiveLockHolder(null); - Mockito.verify(svc2.proxy).transitionToActive(); + Mockito.verify(svc2.proxy, Mockito.timeout(2000).atLeastOnce()) + .transitionToActive(); waitForHAState(svc1, HAServiceState.STANDBY); waitForHAState(svc2, HAServiceState.STANDBY); @@ -283,6 +284,12 @@ public class TestZKFailoverController ex waitForHAState(svc1, HAServiceState.ACTIVE); waitForHAState(svc2, HAServiceState.STANDBY); waitForActiveLockHolder(svc1); + + // Ensure that we can fail back to thr2 once it it is able + // to become active (e.g the admin has restarted it) + LOG.info("Allowing svc2 to become active, expiring svc1"); + svc2.failToBecomeActive = false; + expireAndVerifyFailover(thr1, thr2); } finally { stopFCs(); }