Author: tedyu
Date: Fri Mar 30 17:45:57 2012
New Revision: 1307549

URL: http://svn.apache.org/viewvc?rev=1307549&view=rev
Log:
HBASE-5573 Replace client ZooKeeper watchers by simple ZooKeeper reads (N 
Keywal)

Modified:
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
 Fri Mar 30 17:45:57 2012
@@ -24,7 +24,10 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -66,6 +69,7 @@ import org.apache.zookeeper.KeeperExcept
  * </p>
  */
 public class ReplicationAdmin implements Closeable {
+  private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
 
   private final ReplicationZookeeper replicationZk;
   private final HConnection connection;
@@ -82,7 +86,7 @@ public class ReplicationAdmin implements
           "enable it in order to use replication");
     }
     this.connection = HConnectionManager.getConnection(conf);
-    ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher();
+    ZooKeeperWatcher zkw = createZooKeeperWatcher();
     try {
       this.replicationZk = new ReplicationZookeeper(this.connection, conf, 
zkw);
     } catch (KeeperException e) {
@@ -90,6 +94,24 @@ public class ReplicationAdmin implements
     }
   }
 
+  private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
+    return new ZooKeeperWatcher(connection.getConfiguration(),
+      "Replication Admin", new Abortable() {
+      @Override
+      public void abort(String why, Throwable e) {
+        LOG.error(why, e);
+        System.exit(1);
+      }
+
+      @Override
+      public boolean isAborted() {
+        return false;
+      }
+
+    });
+  }
+
+
   /**
    * Add a new peer cluster to replicate to.
    * @param id a short that identifies the cluster

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Fri 
Mar 30 17:45:57 2012
@@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.util.hbck
 import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
 import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKTable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
@@ -954,13 +955,15 @@ public class HBaseFsck {
     HConnectionManager.execute(new HConnectable<Void>(conf) {
       @Override
       public Void connect(HConnection connection) throws IOException {
-        ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+        ZooKeeperWatcher zkw = createZooKeeperWatcher();
         try {
           for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) {
             disabledTables.add(Bytes.toBytes(tableName));
           }
         } catch (KeeperException ke) {
           throw new IOException(ke);
+        } finally {
+          zkw.close();
         }
         return null;
       }
@@ -1046,8 +1049,8 @@ public class HBaseFsck {
     ServerName sn;
     try {
       sn = getRootRegionServerName();
-    } catch (InterruptedException e) {
-      throw new IOException("Interrupted", e);
+    } catch (KeeperException e) {
+      throw new IOException(e);
     }
     MetaEntry m =
       new MetaEntry(rootLocation.getRegionInfo(), sn, 
System.currentTimeMillis());
@@ -1056,29 +1059,35 @@ public class HBaseFsck {
     return true;
   }
 
+  private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
+    return new ZooKeeperWatcher(conf, "hbase Fsck", new Abortable() {
+      @Override
+      public void abort(String why, Throwable e) {
+        LOG.error(why, e);
+        System.exit(1);
+      }
+
+      @Override
+      public boolean isAborted() {
+        return false;
+      }
+
+    });
+  }
+
   private ServerName getRootRegionServerName()
-  throws IOException, InterruptedException {
-    RootRegionTracker rootRegionTracker =
-      new RootRegionTracker(this.connection.getZooKeeperWatcher(), new 
Abortable() {
-        @Override
-        public void abort(String why, Throwable e) {
-          LOG.error(why, e);
-          System.exit(1);
-        }
-        @Override
-        public boolean isAborted(){
-          return false;
-        }
-        
-      });
-    rootRegionTracker.start();
-    ServerName sn = null;
+    throws IOException, KeeperException {
+
+    ZooKeeperWatcher zkw = createZooKeeperWatcher();
+
+    byte[] data;
     try {
-      sn = rootRegionTracker.getRootRegionLocation();
+      data = ZKUtil.getData(zkw, zkw.rootServerZNode);
     } finally {
-      rootRegionTracker.stop();
+      zkw.close();
     }
-    return sn;
+
+    return RootRegionTracker.dataToServerName(data);
   }
 
   /**

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Fri 
Mar 30 17:45:57 2012
@@ -1179,12 +1179,24 @@ public class ZKUtil {
     if (zkw == null) throw new IllegalArgumentException();
     if (znode == null) throw new IllegalArgumentException();
 
-    ZooKeeperNodeTracker znt = new ZooKeeperNodeTracker(zkw, znode, new 
Abortable() {
-      @Override public void abort(String why, Throwable e) {}
-      @Override public boolean isAborted() {return false;}
-    }) {
-    };
+    byte[] data = null;
+    boolean finished = false;
+    final long endTime = System.currentTimeMillis() + timeout;
+    while (!finished) {
+      try {
+        data = ZKUtil.getData(zkw, znode);
+      } catch(KeeperException e) {
+        LOG.warn("Unexpected exception handling blockUntilAvailable", e);
+      }
 
-    return znt.blockUntilAvailable(timeout, true);
+      if (data == null && (System.currentTimeMillis() +
+        HConstants.SOCKET_RETRY_WAIT_MS < endTime)) {
+        Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
+      } else {
+        finished = true;
+      }
+    }
+
+    return data;
   }
 }

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
Fri Mar 30 17:45:57 2012
@@ -712,6 +712,12 @@ public class HBaseTestingUtility {
       hbaseAdmin.close();
       hbaseAdmin = null;
     }
+
+    if (zooKeeperWatcher != null) {
+      zooKeeperWatcher.close();
+      zooKeeperWatcher = null;
+    }
+
     if (this.hbaseCluster != null) {
       this.hbaseCluster.shutdown();
       // Wait till hbase is down before going on to shutdown zk.
@@ -1418,6 +1424,32 @@ public class HBaseTestingUtility {
   private HBaseAdmin hbaseAdmin = null;
 
   /**
+   * Returns a ZooKeeperWatcher instance.
+   * This instance is shared between HBaseTestingUtility instance users.
+   * Don't close it, it will be closed automatically when the
+   * cluster shutdowns
+   *
+   * @return The ZooKeeperWatcher instance.
+   * @throws IOException
+   */
+  public synchronized ZooKeeperWatcher getZooKeeperWatcher()
+    throws IOException {
+    if (zooKeeperWatcher == null) {
+      zooKeeperWatcher = new ZooKeeperWatcher(conf, "testing utility",
+        new Abortable() {
+        @Override public void abort(String why, Throwable e) {
+          throw new RuntimeException("Unexpected abort in 
HBaseTestingUtility:"+why, e);
+        }
+        @Override public boolean isAborted() {return false;}
+      });
+    }
+    return zooKeeperWatcher;
+  }
+  private ZooKeeperWatcher zooKeeperWatcher;
+
+
+
+  /**
    * Closes the named region.
    *
    * @param regionName  The region to close.

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java 
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java Fri 
Mar 30 17:45:57 2012
@@ -26,6 +26,8 @@ import static org.junit.Assert.assertNul
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -87,6 +89,17 @@ public class TestZooKeeper {
     TEST_UTIL.ensureSomeRegionServersAvailable(2);
   }
 
+
+  private ZooKeeperWatcher getZooKeeperWatcher(HConnection c) throws
+    NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+
+    Method getterZK = c.getClass().getMethod("getKeepAliveZooKeeperWatcher");
+    getterZK.setAccessible(true);
+
+    return (ZooKeeperWatcher) getterZK.invoke(c);
+  }
+
+
   /**
    * See HBASE-1232 and http://wiki.apache.org/hadoop/ZooKeeper/FAQ#4.
    * @throws IOException
@@ -102,7 +115,7 @@ public class TestZooKeeper {
 
     HConnection connection = HConnectionManager.getConnection(c);
 
-    ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
+    ZooKeeperWatcher connectionZK = getZooKeeperWatcher(connection);
     LOG.info("ZooKeeperWatcher= 0x"+ Integer.toHexString(
       connectionZK.hashCode()));
     LOG.info("getRecoverableZooKeeper= 0x"+ Integer.toHexString(
@@ -143,7 +156,7 @@ public class TestZooKeeper {
     Assert.assertTrue(state == States.CLOSED);
 
     // Check that the client recovered
-    ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
+    ZooKeeperWatcher newConnectionZK = getZooKeeperWatcher(connection);
 
     States state2 = newConnectionZK.getRecoverableZooKeeper().getState();
     LOG.info("After new get state=" +state2);
@@ -154,7 +167,7 @@ public class TestZooKeeper {
     while (System.currentTimeMillis() < limit2 &&
       state2 != States.CONNECTED && state2 != States.CONNECTING) {
 
-      newConnectionZK = connection.getZooKeeperWatcher();
+      newConnectionZK = getZooKeeperWatcher(connection);
       state2 = newConnectionZK.getRecoverableZooKeeper().getState();
     }
     LOG.info("After new get state loop=" + state2);
@@ -233,11 +246,13 @@ public class TestZooKeeper {
       ipMeta.exists(new Get(HConstants.LAST_ROW));
 
       // make sure they aren't the same
-      
assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration()).getZooKeeperWatcher()
-          == 
HConnectionManager.getConnection(otherConf).getZooKeeperWatcher());
-      
assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration())
-          .getZooKeeperWatcher().getQuorum().equals(HConnectionManager
-              .getConnection(otherConf).getZooKeeperWatcher().getQuorum()));
+      ZooKeeperWatcher z1 =
+        
getZooKeeperWatcher(HConnectionManager.getConnection(localMeta.getConfiguration()));
+      ZooKeeperWatcher z2 =
+        getZooKeeperWatcher(HConnectionManager.getConnection(otherConf));
+      assertFalse(z1 == z2);
+      assertFalse(z1.getQuorum().equals(z2.getQuorum()));
+
       localMeta.close();
       ipMeta.close();
     } catch (Exception e) {

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java 
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Fri 
Mar 30 17:45:57 2012
@@ -274,6 +274,27 @@ public class TestHCM {
     assertTrue(c2 != c3);
   }
 
+
+  /**
+   * This test checks that one can connect to the cluster with only the
+   *  ZooKeeper quorum set. Other stuff like master address will be read
+   *  from ZK by the client.
+   */
+  @Test(timeout = 10000)
+  public void testConnection() throws Exception{
+    // We create an empty config and add the ZK address.
+    Configuration c = new Configuration();
+    c.set(HConstants.ZOOKEEPER_QUORUM,
+      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
+    c.set(HConstants.ZOOKEEPER_CLIENT_PORT ,
+      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
+
+    // This should be enough to connect
+    HConnection conn = HConnectionManager.getConnection(c);
+    assertTrue( conn.isMasterRunning() );
+    conn.close();
+  }
+
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
     new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
 (original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
 Fri Mar 30 17:45:57 2012
@@ -132,13 +132,13 @@ public class TestSplitTransactionOnClust
       List<HRegion> daughters = cluster.getRegions(tableName);
       assertTrue(daughters.size() >= 2);
       // Assert the ephemeral node is up in zk.
-      String path = 
ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(),
+      String path = ZKAssign.getNodeName(TESTING_UTIL.getZooKeeperWatcher(),
         hri.getEncodedName());
       Stat stats =
-        
t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, 
false);
+        
TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, 
false);
       LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" 
+ stats);
       RegionTransitionData rtd =
-        ZKAssign.getData(t.getConnection().getZooKeeperWatcher(),
+        ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(),
           hri.getEncodedName());
       // State could be SPLIT or SPLITTING.
       assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) ||
@@ -158,7 +158,7 @@ public class TestSplitTransactionOnClust
         assertTrue(daughters.contains(r));
       }
       // Finally assert that the ephemeral SPLIT znode was cleaned up.
-      stats = 
t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, 
false);
+      stats = 
TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, 
false);
       LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" 
+ stats);
       assertTrue(stats == null);
     } finally {
@@ -195,7 +195,7 @@ public class TestSplitTransactionOnClust
       int regionCount = server.getOnlineRegions().size();
       // Insert into zk a blocking znode, a znode of same name as region
       // so it gets in way of our splitting.
-      ZKAssign.createNodeClosing(t.getConnection().getZooKeeperWatcher(),
+      ZKAssign.createNodeClosing(TESTING_UTIL.getZooKeeperWatcher(),
         hri, new ServerName("any.old.server", 1234, -1));
       // Now try splitting.... should fail.  And each should successfully
       // rollback.
@@ -208,7 +208,7 @@ public class TestSplitTransactionOnClust
         assertEquals(regionCount, server.getOnlineRegions().size());
       }
       // Now clear the zknode
-      ZKAssign.deleteClosingNode(t.getConnection().getZooKeeperWatcher(), hri);
+      ZKAssign.deleteClosingNode(TESTING_UTIL.getZooKeeperWatcher(), hri);
       // Now try splitting and it should work.
       split(hri, server, regionCount);
       // Get daughters


Reply via email to