http://git-wip-us.apache.org/repos/asf/accumulo/blob/7eb838e3/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java ---------------------------------------------------------------------- diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java index 420533a,0000000..c9c77b8 mode 100644,000000..100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java @@@ -1,317 -1,0 +1,319 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.fate.zookeeper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; ++import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +/** + * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper. + * + */ - public class ZooCache { ++public class ZooCache implements Closeable { + private static final Logger log = Logger.getLogger(ZooCache.class); + + private ZCacheWatcher watcher = new ZCacheWatcher(); + private Watcher externalWatcher = null; + + private HashMap<String,byte[]> cache; + private HashMap<String,Stat> statCache; + private HashMap<String,List<String>> childrenCache; + + private ZooReader zReader; + + private ZooKeeper getZooKeeper() { + return zReader.getZooKeeper(); + } + + private class ZCacheWatcher implements Watcher { + @Override + public void process(WatchedEvent event) { + + if (log.isTraceEnabled()) + log.trace(event); + + switch (event.getType()) { + case NodeDataChanged: + case NodeChildrenChanged: + case NodeCreated: + case NodeDeleted: + remove(event.getPath()); + break; + case None: + switch (event.getState()) { + case Disconnected: + if (log.isTraceEnabled()) + log.trace("Zoo keeper connection disconnected, clearing cache"); + clear(); + break; + case SyncConnected: + break; + case Expired: + if (log.isTraceEnabled()) + log.trace("Zoo keeper connection expired, clearing cache"); + clear(); + break; + default: + log.warn("Unhandled: " + event); + } + break; + default: + log.warn("Unhandled: " + event); + } + + if (externalWatcher != null) { + externalWatcher.process(event); + } + } + } + + public ZooCache(String zooKeepers, int sessionTimeout) { + this(zooKeepers, sessionTimeout, null); + } + + public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) { + this(new ZooReader(zooKeepers, sessionTimeout), watcher); + } + + public ZooCache(ZooReader reader, Watcher watcher) { + this.zReader = reader; + this.cache = new HashMap<String,byte[]>(); + this.statCache = new HashMap<String,Stat>(); + this.childrenCache = new HashMap<String,List<String>>(); + this.externalWatcher = watcher; + } + + private static interface ZooRunnable { + void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException; + } + + private synchronized void retry(ZooRunnable op) { + + int sleepTime = 100; + + while (true) { + + ZooKeeper zooKeeper = getZooKeeper(); + + try { + op.run(zooKeeper); + return; + + } catch (KeeperException e) { + if (e.code() == Code.NONODE) { + log.error("Looked up non existant node in cache " + e.getPath(), e); + } + log.warn("Zookeeper error, will retry", e); + } catch (InterruptedException e) { + log.info("Zookeeper error, will retry", e); + } catch (ConcurrentModificationException e) { + log.debug("Zookeeper was modified, will retry"); + } + + try { + // do not hold lock while sleeping + wait(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (sleepTime < 10000) + sleepTime = (int) (sleepTime + sleepTime * Math.random()); + + } + } + + public synchronized List<String> getChildren(final String zPath) { + + ZooRunnable zr = new ZooRunnable() { + + @Override + public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { + + if (childrenCache.containsKey(zPath)) + return; + + try { + List<String> children = zooKeeper.getChildren(zPath, watcher); + childrenCache.put(zPath, children); + } catch (KeeperException ke) { + if (ke.code() != Code.NONODE) { + throw ke; + } + } + } + + }; + + retry(zr); + + List<String> children = childrenCache.get(zPath); + if (children == null) { + return null; + } + return Collections.unmodifiableList(children); + } + + public synchronized byte[] get(final String zPath) { + return get(zPath, null); + } + + public synchronized byte[] get(final String zPath, Stat stat) { + ZooRunnable zr = new ZooRunnable() { + + @Override + public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { + + if (cache.containsKey(zPath)) + return; + + /* + * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existance, it will be added to + * the cache. But this notification of a node coming into existance will only be given if exists() was previously called. + * + * If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then + * non-existance can not be cached. + */ + + Stat stat = zooKeeper.exists(zPath, watcher); + + byte[] data = null; + + if (stat == null) { + if (log.isTraceEnabled()) + log.trace("zookeeper did not contain " + zPath); + } else { + try { + data = zooKeeper.getData(zPath, watcher, stat); + } catch (KeeperException.BadVersionException e1) { + throw new ConcurrentModificationException(); + } catch (KeeperException.NoNodeException e2) { + throw new ConcurrentModificationException(); + } + if (log.isTraceEnabled()) + log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data))); + } + if (log.isTraceEnabled()) + log.trace("putting " + zPath + " " + (data == null ? null : new String(data)) + " in cache"); + put(zPath, data, stat); + } + + }; + + retry(zr); + + if (stat != null) { + Stat cstat = statCache.get(zPath); + if (cstat != null) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + cstat.write(dos); + dos.close(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dis = new DataInputStream(bais); + stat.readFields(dis); + + dis.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + return cache.get(zPath); + } + + private synchronized void put(String zPath, byte[] data, Stat stat) { + cache.put(zPath, data); + statCache.put(zPath, stat); + } + + private synchronized void remove(String zPath) { + if (log.isTraceEnabled()) + log.trace("removing " + zPath + " from cache"); + cache.remove(zPath); + childrenCache.remove(zPath); + statCache.remove(zPath); + } + + public synchronized void clear() { + cache.clear(); + childrenCache.clear(); + statCache.clear(); + } + + public synchronized void clear(String zPath) { + + for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) { + String path = i.next(); + if (path.startsWith(zPath)) + i.remove(); + } + + for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) { + String path = i.next(); + if (path.startsWith(zPath)) + i.remove(); + } + + for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) { + String path = i.next(); + if (path.startsWith(zPath)) + i.remove(); + } + } + + private static Map<String,ZooCache> instances = new HashMap<String,ZooCache>(); + + public static synchronized ZooCache getInstance(String zooKeepers, int sessionTimeout) { + String key = zooKeepers + ":" + sessionTimeout; + ZooCache zc = instances.get(key); + if (zc == null) { + zc = new ZooCache(zooKeepers, sessionTimeout); + instances.put(key, zc); + } + + return zc; + } + - public void close() throws InterruptedException { ++ @Override ++ public void close() { + cache.clear(); + statCache.clear(); + childrenCache.clear(); + zReader.close(); + } +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7eb838e3/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java ---------------------------------------------------------------------- diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java index e11f570,0000000..5fc9595 mode 100644,000000..100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java @@@ -1,109 -1,0 +1,118 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.fate.zookeeper; + ++import java.io.Closeable; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.data.Stat; + - public class ZooReader implements IZooReader { ++public class ZooReader implements IZooReader, Closeable { + + protected String keepers; + protected int timeout; + + protected ZooKeeper getSession(String keepers, int timeout, String scheme, byte[] auth) { + return ZooSession.getSession(keepers, timeout, scheme, auth); + } + + protected ZooKeeper getZooKeeper() { + return getSession(keepers, timeout, null, null); + } + + @Override + public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException { + return getZooKeeper().getData(zPath, false, stat); + } + + @Override + public Stat getStatus(String zPath) throws KeeperException, InterruptedException { + return getZooKeeper().exists(zPath, false); + } + + @Override + public Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException { + return getZooKeeper().exists(zPath, watcher); + } + + @Override + public List<String> getChildren(String zPath) throws KeeperException, InterruptedException { + return getZooKeeper().getChildren(zPath, false); + } + + @Override + public List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException { + return getZooKeeper().getChildren(zPath, watcher); + } + + @Override + public boolean exists(String zPath) throws KeeperException, InterruptedException { + return getZooKeeper().exists(zPath, false) != null; + } + + @Override + public boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException { + return getZooKeeper().exists(zPath, watcher) != null; + } + + @Override + public void sync(final String path) throws KeeperException, InterruptedException { + final AtomicInteger rc = new AtomicInteger(); + final AtomicBoolean waiter = new AtomicBoolean(false); + getZooKeeper().sync(path, new VoidCallback() { + @Override + public void processResult(int code, String arg1, Object arg2) { + rc.set(code); + synchronized (waiter) { + waiter.set(true); + waiter.notifyAll(); + } + }}, null); + synchronized (waiter) { + while (!waiter.get()) + waiter.wait(); + } + Code code = Code.get(rc.get()); + if (code != KeeperException.Code.OK) { + throw KeeperException.create(code); + } + } + + public ZooReader(String keepers, int timeout) { + this.keepers = keepers; + this.timeout = timeout; + } + - public void close() throws InterruptedException { - getZooKeeper().close(); ++ /** ++ * Closes this reader. If closure of the underlying session is interrupted, ++ * this method sets the calling thread's interrupt status. ++ */ ++ public void close() { ++ try { ++ getZooKeeper().close(); ++ } catch (InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7eb838e3/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java ---------------------------------------------------------------------- diff --cc server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java index f12dca5,0000000..154c9c2 mode 100644,000000..100644 --- a/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java +++ b/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java @@@ -1,213 -1,0 +1,209 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.client; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.impl.ConnectorImpl; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.security.CredentialHelper; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.OpTimer; +import org.apache.accumulo.core.util.StringUtil; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * An implementation of Instance that looks in HDFS and ZooKeeper to find the master and root tablet location. + * + */ +public class HdfsZooInstance implements Instance { + + public static class AccumuloNotInitializedException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public AccumuloNotInitializedException(String string) { + super(string); + } + } + + private HdfsZooInstance() { + AccumuloConfiguration acuConf = ServerConfiguration.getSiteConfiguration(); + zooCache = new ZooCache(acuConf.get(Property.INSTANCE_ZK_HOST), (int) acuConf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)); + } + + private static HdfsZooInstance cachedHdfsZooInstance = null; + + public static synchronized Instance getInstance() { + if (cachedHdfsZooInstance == null) + cachedHdfsZooInstance = new HdfsZooInstance(); + return cachedHdfsZooInstance; + } + + private static ZooCache zooCache; + private static String instanceId = null; + private static final Logger log = Logger.getLogger(HdfsZooInstance.class); + + @Override + public String getRootTabletLocation() { + String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION; + + OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zoocache."); + + byte[] loc = zooCache.get(zRootLocPath); + + opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%"); + + if (loc == null) { + return null; + } + + return new String(loc).split("\\|")[0]; + } + + @Override + public List<String> getMasterLocations() { + + String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK; + + OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache."); + + byte[] loc = ZooLock.getLockData(zooCache, masterLocPath, null); + + opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%"); + + if (loc == null) { + return Collections.emptyList(); + } + + return Collections.singletonList(new String(loc)); + } + + @Override + public String getInstanceID() { + if (instanceId == null) + _getInstanceID(); + return instanceId; + } + + private static synchronized void _getInstanceID() { + if (instanceId == null) { + @SuppressWarnings("deprecation") + String instanceIdFromFile = ZooKeeperInstance.getInstanceIDFromHdfs(ServerConstants.getInstanceIdLocation()); + instanceId = instanceIdFromFile; + } + } + + @Override + public String getInstanceName() { + return ZooKeeperInstance.lookupInstanceName(zooCache, UUID.fromString(getInstanceID())); + } + + @Override + public String getZooKeepers() { + return ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST); + } + + @Override + public int getZooKeepersSessionTimeOut() { + return (int) ServerConfiguration.getSiteConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); + } + + @Override + // Not really deprecated, just not for client use + public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException { + return getConnector(CredentialHelper.create(principal, token, getInstanceID())); + } + + @SuppressWarnings("deprecation") + private Connector getConnector(TCredentials cred) throws AccumuloException, AccumuloSecurityException { + return new ConnectorImpl(this, cred); + } + + @Deprecated + @Override + // Not really deprecated, just not for client use + public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException { + return getConnector(user, new PasswordToken(pass)); + } + + @Deprecated + @Override + // Not really deprecated, just not for client use + public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException { + return getConnector(user, ByteBufferUtil.toBytes(pass)); + } + + @Deprecated + @Override + public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException { + return getConnector(user, TextUtil.getBytes(new Text(pass.toString()))); + } + + private AccumuloConfiguration conf = null; + + @Override + public AccumuloConfiguration getConfiguration() { + if (conf == null) + conf = new ServerConfiguration(this).getConfiguration(); + return conf; + } + + @Override + public void setConfiguration(AccumuloConfiguration conf) { + this.conf = conf; + } + + public static void main(String[] args) { + Instance instance = HdfsZooInstance.getInstance(); + System.out.println("Instance Name: " + instance.getInstanceName()); + System.out.println("Instance ID: " + instance.getInstanceID()); + System.out.println("ZooKeepers: " + instance.getZooKeepers()); + System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", ")); + } + + @Override - public void close() throws AccumuloException { - try { - zooCache.close(); - } catch (InterruptedException e) { - throw new AccumuloException("Issues closing ZooKeeper, try again"); - } ++ public void close() { ++ zooCache.close(); + } + + @Deprecated + @Override + public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException { + return getConnector(auth.user, auth.getPassword()); + } +}