Author: tedyu
Date: Fri Mar 30 17:45:57 2012
New Revision: 1307549
URL: http://svn.apache.org/viewvc?rev=1307549&view=rev
Log:
HBASE-5573 Replace client ZooKeeper watchers by simple ZooKeeper reads (N
Keywal)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
Fri Mar 30 17:45:57 2012
@@ -24,7 +24,10 @@ import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -66,6 +69,7 @@ import org.apache.zookeeper.KeeperExcept
* </p>
*/
public class ReplicationAdmin implements Closeable {
+ private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
private final ReplicationZookeeper replicationZk;
private final HConnection connection;
@@ -82,7 +86,7 @@ public class ReplicationAdmin implements
"enable it in order to use replication");
}
this.connection = HConnectionManager.getConnection(conf);
- ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher();
+ ZooKeeperWatcher zkw = createZooKeeperWatcher();
try {
this.replicationZk = new ReplicationZookeeper(this.connection, conf,
zkw);
} catch (KeeperException e) {
@@ -90,6 +94,24 @@ public class ReplicationAdmin implements
}
}
+ private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
+ return new ZooKeeperWatcher(connection.getConfiguration(),
+ "Replication Admin", new Abortable() {
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.error(why, e);
+ System.exit(1);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+
+ });
+ }
+
+
/**
* Add a new peer cluster to replicate to.
* @param id a short that identifies the cluster
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Fri
Mar 30 17:45:57 2012
@@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.util.hbck
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -954,13 +955,15 @@ public class HBaseFsck {
HConnectionManager.execute(new HConnectable<Void>(conf) {
@Override
public Void connect(HConnection connection) throws IOException {
- ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ ZooKeeperWatcher zkw = createZooKeeperWatcher();
try {
for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) {
disabledTables.add(Bytes.toBytes(tableName));
}
} catch (KeeperException ke) {
throw new IOException(ke);
+ } finally {
+ zkw.close();
}
return null;
}
@@ -1046,8 +1049,8 @@ public class HBaseFsck {
ServerName sn;
try {
sn = getRootRegionServerName();
- } catch (InterruptedException e) {
- throw new IOException("Interrupted", e);
+ } catch (KeeperException e) {
+ throw new IOException(e);
}
MetaEntry m =
new MetaEntry(rootLocation.getRegionInfo(), sn,
System.currentTimeMillis());
@@ -1056,29 +1059,35 @@ public class HBaseFsck {
return true;
}
+ private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
+ return new ZooKeeperWatcher(conf, "hbase Fsck", new Abortable() {
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.error(why, e);
+ System.exit(1);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+
+ });
+ }
+
private ServerName getRootRegionServerName()
- throws IOException, InterruptedException {
- RootRegionTracker rootRegionTracker =
- new RootRegionTracker(this.connection.getZooKeeperWatcher(), new
Abortable() {
- @Override
- public void abort(String why, Throwable e) {
- LOG.error(why, e);
- System.exit(1);
- }
- @Override
- public boolean isAborted(){
- return false;
- }
-
- });
- rootRegionTracker.start();
- ServerName sn = null;
+ throws IOException, KeeperException {
+
+ ZooKeeperWatcher zkw = createZooKeeperWatcher();
+
+ byte[] data;
try {
- sn = rootRegionTracker.getRootRegionLocation();
+ data = ZKUtil.getData(zkw, zkw.rootServerZNode);
} finally {
- rootRegionTracker.stop();
+ zkw.close();
}
- return sn;
+
+ return RootRegionTracker.dataToServerName(data);
}
/**
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Fri
Mar 30 17:45:57 2012
@@ -1179,12 +1179,24 @@ public class ZKUtil {
if (zkw == null) throw new IllegalArgumentException();
if (znode == null) throw new IllegalArgumentException();
- ZooKeeperNodeTracker znt = new ZooKeeperNodeTracker(zkw, znode, new
Abortable() {
- @Override public void abort(String why, Throwable e) {}
- @Override public boolean isAborted() {return false;}
- }) {
- };
+ byte[] data = null;
+ boolean finished = false;
+ final long endTime = System.currentTimeMillis() + timeout;
+ while (!finished) {
+ try {
+ data = ZKUtil.getData(zkw, znode);
+ } catch(KeeperException e) {
+ LOG.warn("Unexpected exception handling blockUntilAvailable", e);
+ }
- return znt.blockUntilAvailable(timeout, true);
+ if (data == null && (System.currentTimeMillis() +
+ HConstants.SOCKET_RETRY_WAIT_MS < endTime)) {
+ Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
+ } else {
+ finished = true;
+ }
+ }
+
+ return data;
}
}
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
Fri Mar 30 17:45:57 2012
@@ -712,6 +712,12 @@ public class HBaseTestingUtility {
hbaseAdmin.close();
hbaseAdmin = null;
}
+
+ if (zooKeeperWatcher != null) {
+ zooKeeperWatcher.close();
+ zooKeeperWatcher = null;
+ }
+
if (this.hbaseCluster != null) {
this.hbaseCluster.shutdown();
// Wait till hbase is down before going on to shutdown zk.
@@ -1418,6 +1424,32 @@ public class HBaseTestingUtility {
private HBaseAdmin hbaseAdmin = null;
/**
+ * Returns a ZooKeeperWatcher instance.
+ * This instance is shared between HBaseTestingUtility instance users.
+ * Don't close it, it will be closed automatically when the
+ * cluster shutdowns
+ *
+ * @return The ZooKeeperWatcher instance.
+ * @throws IOException
+ */
+ public synchronized ZooKeeperWatcher getZooKeeperWatcher()
+ throws IOException {
+ if (zooKeeperWatcher == null) {
+ zooKeeperWatcher = new ZooKeeperWatcher(conf, "testing utility",
+ new Abortable() {
+ @Override public void abort(String why, Throwable e) {
+ throw new RuntimeException("Unexpected abort in
HBaseTestingUtility:"+why, e);
+ }
+ @Override public boolean isAborted() {return false;}
+ });
+ }
+ return zooKeeperWatcher;
+ }
+ private ZooKeeperWatcher zooKeeperWatcher;
+
+
+
+ /**
* Closes the named region.
*
* @param regionName The region to close.
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java Fri
Mar 30 17:45:57 2012
@@ -26,6 +26,8 @@ import static org.junit.Assert.assertNul
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -87,6 +89,17 @@ public class TestZooKeeper {
TEST_UTIL.ensureSomeRegionServersAvailable(2);
}
+
+ private ZooKeeperWatcher getZooKeeperWatcher(HConnection c) throws
+ NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+
+ Method getterZK = c.getClass().getMethod("getKeepAliveZooKeeperWatcher");
+ getterZK.setAccessible(true);
+
+ return (ZooKeeperWatcher) getterZK.invoke(c);
+ }
+
+
/**
* See HBASE-1232 and http://wiki.apache.org/hadoop/ZooKeeper/FAQ#4.
* @throws IOException
@@ -102,7 +115,7 @@ public class TestZooKeeper {
HConnection connection = HConnectionManager.getConnection(c);
- ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
+ ZooKeeperWatcher connectionZK = getZooKeeperWatcher(connection);
LOG.info("ZooKeeperWatcher= 0x"+ Integer.toHexString(
connectionZK.hashCode()));
LOG.info("getRecoverableZooKeeper= 0x"+ Integer.toHexString(
@@ -143,7 +156,7 @@ public class TestZooKeeper {
Assert.assertTrue(state == States.CLOSED);
// Check that the client recovered
- ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
+ ZooKeeperWatcher newConnectionZK = getZooKeeperWatcher(connection);
States state2 = newConnectionZK.getRecoverableZooKeeper().getState();
LOG.info("After new get state=" +state2);
@@ -154,7 +167,7 @@ public class TestZooKeeper {
while (System.currentTimeMillis() < limit2 &&
state2 != States.CONNECTED && state2 != States.CONNECTING) {
- newConnectionZK = connection.getZooKeeperWatcher();
+ newConnectionZK = getZooKeeperWatcher(connection);
state2 = newConnectionZK.getRecoverableZooKeeper().getState();
}
LOG.info("After new get state loop=" + state2);
@@ -233,11 +246,13 @@ public class TestZooKeeper {
ipMeta.exists(new Get(HConstants.LAST_ROW));
// make sure they aren't the same
-
assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration()).getZooKeeperWatcher()
- ==
HConnectionManager.getConnection(otherConf).getZooKeeperWatcher());
-
assertFalse(HConnectionManager.getConnection(localMeta.getConfiguration())
- .getZooKeeperWatcher().getQuorum().equals(HConnectionManager
- .getConnection(otherConf).getZooKeeperWatcher().getQuorum()));
+ ZooKeeperWatcher z1 =
+
getZooKeeperWatcher(HConnectionManager.getConnection(localMeta.getConfiguration()));
+ ZooKeeperWatcher z2 =
+ getZooKeeperWatcher(HConnectionManager.getConnection(otherConf));
+ assertFalse(z1 == z2);
+ assertFalse(z1.getQuorum().equals(z2.getQuorum()));
+
localMeta.close();
ipMeta.close();
} catch (Exception e) {
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Fri
Mar 30 17:45:57 2012
@@ -274,6 +274,27 @@ public class TestHCM {
assertTrue(c2 != c3);
}
+
+ /**
+ * This test checks that one can connect to the cluster with only the
+ * ZooKeeper quorum set. Other stuff like master address will be read
+ * from ZK by the client.
+ */
+ @Test(timeout = 10000)
+ public void testConnection() throws Exception{
+ // We create an empty config and add the ZK address.
+ Configuration c = new Configuration();
+ c.set(HConstants.ZOOKEEPER_QUORUM,
+ TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
+ c.set(HConstants.ZOOKEEPER_CLIENT_PORT ,
+ TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
+
+ // This should be enough to connect
+ HConnection conn = HConnectionManager.getConnection(c);
+ assertTrue( conn.isMasterRunning() );
+ conn.close();
+ }
+
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java?rev=1307549&r1=1307548&r2=1307549&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
Fri Mar 30 17:45:57 2012
@@ -132,13 +132,13 @@ public class TestSplitTransactionOnClust
List<HRegion> daughters = cluster.getRegions(tableName);
assertTrue(daughters.size() >= 2);
// Assert the ephemeral node is up in zk.
- String path =
ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(),
+ String path = ZKAssign.getNodeName(TESTING_UTIL.getZooKeeperWatcher(),
hri.getEncodedName());
Stat stats =
-
t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path,
false);
+
TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path,
false);
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats="
+ stats);
RegionTransitionData rtd =
- ZKAssign.getData(t.getConnection().getZooKeeperWatcher(),
+ ZKAssign.getData(TESTING_UTIL.getZooKeeperWatcher(),
hri.getEncodedName());
// State could be SPLIT or SPLITTING.
assertTrue(rtd.getEventType().equals(EventType.RS_ZK_REGION_SPLIT) ||
@@ -158,7 +158,7 @@ public class TestSplitTransactionOnClust
assertTrue(daughters.contains(r));
}
// Finally assert that the ephemeral SPLIT znode was cleaned up.
- stats =
t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path,
false);
+ stats =
TESTING_UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().exists(path,
false);
LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats="
+ stats);
assertTrue(stats == null);
} finally {
@@ -195,7 +195,7 @@ public class TestSplitTransactionOnClust
int regionCount = server.getOnlineRegions().size();
// Insert into zk a blocking znode, a znode of same name as region
// so it gets in way of our splitting.
- ZKAssign.createNodeClosing(t.getConnection().getZooKeeperWatcher(),
+ ZKAssign.createNodeClosing(TESTING_UTIL.getZooKeeperWatcher(),
hri, new ServerName("any.old.server", 1234, -1));
// Now try splitting.... should fail. And each should successfully
// rollback.
@@ -208,7 +208,7 @@ public class TestSplitTransactionOnClust
assertEquals(regionCount, server.getOnlineRegions().size());
}
// Now clear the zknode
- ZKAssign.deleteClosingNode(t.getConnection().getZooKeeperWatcher(), hri);
+ ZKAssign.deleteClosingNode(TESTING_UTIL.getZooKeeperWatcher(), hri);
// Now try splitting and it should work.
split(hri, server, regionCount);
// Get daughters