Author: umamahesh
Date: Wed Apr 23 19:04:33 2014
New Revision: 1589495

URL: http://svn.apache.org/r1589495
Log:
Merge HADOOP-10251. Both NameNodes could be in STANDBY State if SNN network is 
unstable. Contributed by Vinayakumar B.

Modified:
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1589495&r1=1589494&r2=1589495&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt 
(original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt 
Wed Apr 23 19:04:33 2014
@@ -78,6 +78,9 @@ Release 2.5.0 - UNRELEASED
     HADOOP-10526. Chance for Stream leakage in CompressorStream. (Rushabh 
     Shah via kihwal)
 
+    HADOOP-10251. Both NameNodes could be in STANDBY State if SNN network is 
unstable
+    (Vinayakumar B via umamahesh)
+
 Release 2.4.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1589495&r1=1589494&r2=1589495&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
 Wed Apr 23 19:04:33 2014
@@ -74,6 +74,9 @@ public class HealthMonitor {
   private List<Callback> callbacks = Collections.synchronizedList(
       new LinkedList<Callback>());
 
+  private List<ServiceStateCallback> serviceStateCallbacks = Collections
+      .synchronizedList(new LinkedList<ServiceStateCallback>());
+
   private HAServiceStatus lastServiceState = new HAServiceStatus(
       HAServiceState.INITIALIZING);
   
@@ -134,7 +137,15 @@ public class HealthMonitor {
   public void removeCallback(Callback cb) {
     callbacks.remove(cb);
   }
-  
+
+  public synchronized void addServiceStateCallback(ServiceStateCallback cb) {
+    this.serviceStateCallbacks.add(cb);
+  }
+
+  public synchronized void removeServiceStateCallback(ServiceStateCallback cb) 
{
+    serviceStateCallbacks.remove(cb);
+  }
+
   public void shutdown() {
     LOG.info("Stopping HealthMonitor thread");
     shouldRun = false;
@@ -217,6 +228,9 @@ public class HealthMonitor {
   
   private synchronized void setLastServiceStatus(HAServiceStatus status) {
     this.lastServiceState = status;
+    for (ServiceStateCallback cb : serviceStateCallbacks) {
+      cb.reportServiceStatus(lastServiceState);
+    }
   }
 
   private synchronized void enterState(State newState) {
@@ -293,4 +307,11 @@ public class HealthMonitor {
   static interface Callback {
     void enteredState(State newState);
   }
+
+  /**
+   * Callback interface for service states.
+   */
+  static interface ServiceStateCallback {
+    void reportServiceStatus(HAServiceStatus status);
+  }
 }

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1589495&r1=1589494&r2=1589495&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
 Wed Apr 23 19:04:33 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
 import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
 import org.apache.hadoop.util.ZKUtil;
@@ -105,6 +106,8 @@ public abstract class ZKFailoverControll
 
   private State lastHealthState = State.INITIALIZING;
 
+  private volatile HAServiceState serviceState = HAServiceState.INITIALIZING;
+
   /** Set if a fatal error occurs */
   private String fatalError = null;
 
@@ -294,6 +297,7 @@ public abstract class ZKFailoverControll
   private void initHM() {
     healthMonitor = new HealthMonitor(conf, localTarget);
     healthMonitor.addCallback(new HealthCallbacks());
+    healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
     healthMonitor.start();
   }
   
@@ -376,6 +380,7 @@ public abstract class ZKFailoverControll
       String msg = "Successfully transitioned " + localTarget +
           " to active state";
       LOG.info(msg);
+      serviceState = HAServiceState.ACTIVE;
       recordActiveAttempt(new ActiveAttemptRecord(true, msg));
 
     } catch (Throwable t) {
@@ -484,6 +489,7 @@ public abstract class ZKFailoverControll
       // TODO handle this. It's a likely case since we probably got fenced
       // at the same time.
     }
+    serviceState = HAServiceState.STANDBY;
   }
   
 
@@ -574,6 +580,7 @@ public abstract class ZKFailoverControll
         delayJoiningUntilNanotime = System.nanoTime() +
             TimeUnit.MILLISECONDS.toNanos(millisToCede);
         elector.quitElection(needFence);
+        serviceState = HAServiceState.INITIALIZING;
       }
     }
     recheckElectability();
@@ -739,12 +746,16 @@ public abstract class ZKFailoverControll
         switch (lastHealthState) {
         case SERVICE_HEALTHY:
           elector.joinElection(targetToData(localTarget));
+          if (quitElectionOnBadState) {
+            quitElectionOnBadState = false;
+          }
           break;
           
         case INITIALIZING:
           LOG.info("Ensuring that " + localTarget + " does not " +
               "participate in active master election");
           elector.quitElection(false);
+          serviceState = HAServiceState.INITIALIZING;
           break;
     
         case SERVICE_UNHEALTHY:
@@ -752,6 +763,7 @@ public abstract class ZKFailoverControll
           LOG.info("Quitting master election for " + localTarget +
               " and marking that fencing is necessary");
           elector.quitElection(true);
+          serviceState = HAServiceState.INITIALIZING;
           break;
           
         case HEALTH_MONITOR_FAILED:
@@ -784,6 +796,44 @@ public abstract class ZKFailoverControll
         whenNanos, TimeUnit.NANOSECONDS);
   }
 
+  int serviceStateMismatchCount = 0;
+  boolean quitElectionOnBadState = false;
+
+  void verifyChangedServiceState(HAServiceState changedState) {
+    synchronized (elector) {
+      synchronized (this) {
+        if (serviceState == HAServiceState.INITIALIZING) {
+          if (quitElectionOnBadState) {
+            LOG.debug("rechecking for electability from bad state");
+            recheckElectability();
+          }
+          return;
+        }
+        if (changedState == serviceState) {
+          serviceStateMismatchCount = 0;
+          return;
+        }
+        if (serviceStateMismatchCount == 0) {
+          // recheck one more time. As this might be due to parallel 
transition.
+          serviceStateMismatchCount++;
+          return;
+        }
+        // quit the election as the expected state and reported state
+        // mismatches.
+        LOG.error("Local service " + localTarget
+            + " has changed the serviceState to " + changedState
+            + ". Expected was " + serviceState
+            + ". Quitting election marking fencing necessary.");
+        delayJoiningUntilNanotime = System.nanoTime()
+            + TimeUnit.MILLISECONDS.toNanos(1000);
+        elector.quitElection(true);
+        quitElectionOnBadState = true;
+        serviceStateMismatchCount = 0;
+        serviceState = HAServiceState.INITIALIZING;
+      }
+    }
+  }
+
   /**
    * @return the last health state passed to the FC
    * by the HealthMonitor.
@@ -855,7 +905,17 @@ public abstract class ZKFailoverControll
       recheckElectability();
     }
   }
-  
+
+  /**
+   * Callbacks for HAServiceStatus
+   */
+  class ServiceStateCallBacks implements HealthMonitor.ServiceStateCallback {
+    @Override
+    public void reportServiceStatus(HAServiceStatus status) {
+      verifyChangedServiceState(status.getState());
+    }
+  }
+
   private static class ActiveAttemptRecord {
     private final boolean succeeded;
     private final String status;

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java?rev=1589495&r1=1589494&r2=1589495&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
 Wed Apr 23 19:04:33 2014
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ha;
 
 import static org.junit.Assert.*;
-import static org.junit.Assume.assumeTrue;
 
 import java.security.NoSuchAlgorithmException;
 
@@ -29,7 +28,6 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.HealthMonitor.State;
 import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.apache.zookeeper.KeeperException;
@@ -68,8 +66,6 @@ public class TestZKFailoverController ex
   
   @Before
   public void setupConfAndServices() {
-    // skip tests on Windows until after resolution of ZooKeeper client bug
-    assumeTrue(!Shell.WINDOWS);
     conf = new Configuration();
     conf.set(ZKFailoverController.ZK_ACL_KEY, TEST_ACL);
     conf.set(ZKFailoverController.ZK_AUTH_KEY, TEST_AUTH_GOOD);
@@ -232,6 +228,27 @@ public class TestZKFailoverController ex
       cluster.stop();
     }
   }
+
+  /**
+   * Test that, when the health monitor indicates bad health status,
+   * failover is triggered. Also ensures that graceful active->standby
+   * transition is used when possible, falling back to fencing when
+   * the graceful approach fails.
+   */
+  @Test(timeout=15000)
+  public void testAutoFailoverOnBadState() throws Exception {
+    try {
+      cluster.start();
+      DummyHAService svc0 = cluster.getService(0);
+      LOG.info("Faking svc0 to change the state, should failover to svc1");
+      svc0.state = HAServiceState.STANDBY;
+      
+      // Should fail back to svc0 at this point
+      cluster.waitForHAState(1, HAServiceState.ACTIVE);
+    } finally {
+      cluster.stop();
+    }
+  }
   
   @Test(timeout=15000)
   public void testAutoFailoverOnLostZKSession() throws Exception {


Reply via email to