Updated Branches: refs/heads/1.6.0-SNAPSHOT 7f37d96c3 -> 2610e821b
ACCUMULO-1999 Backport changes from 1664 which advertise the Master's random port Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/622fc7d2 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/622fc7d2 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/622fc7d2 Branch: refs/heads/1.6.0-SNAPSHOT Commit: 622fc7d20744ae8a7e75ba0dc945777f94254b48 Parents: c2cd051 Author: Josh Elser <els...@apache.org> Authored: Tue Dec 10 14:55:18 2013 -0500 Committer: Josh Elser <els...@apache.org> Committed: Thu Dec 12 15:38:15 2013 -0500 ---------------------------------------------------------------------- .../1GB/native-standalone/generic_logger.xml | 2 +- conf/examples/1GB/standalone/generic_logger.xml | 2 +- .../2GB/native-standalone/generic_logger.xml | 2 +- conf/examples/2GB/standalone/generic_logger.xml | 2 +- .../3GB/native-standalone/generic_logger.xml | 2 +- conf/examples/3GB/standalone/generic_logger.xml | 2 +- .../512MB/native-standalone/generic_logger.xml | 2 +- .../512MB/standalone/generic_logger.xml | 2 +- .../org/apache/accumulo/core/Constants.java | 3 + .../accumulo/core/client/impl/MasterClient.java | 3 + .../apache/accumulo/fate/zookeeper/ZooLock.java | 5 + .../minicluster/MiniAccumuloCluster.java | 17 +- .../org/apache/accumulo/server/Accumulo.java | 64 +++++++- .../server/gc/SimpleGarbageCollector.java | 2 +- .../apache/accumulo/server/master/Master.java | 10 +- .../accumulo/server/monitor/LogService.java | 15 +- .../apache/accumulo/server/monitor/Monitor.java | 12 +- .../org/apache/accumulo/server/util/Info.java | 33 ++++ .../server/util/TNonblockingServerSocket.java | 157 +++++++++++++++++++ .../accumulo/server/util/TServerUtils.java | 6 +- .../java/org/apache/accumulo/start/Main.java | 6 +- .../accumulo/fate/zookeeper/ZooLockTest.java | 18 +++ 22 files changed, 345 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/1GB/native-standalone/generic_logger.xml ---------------------------------------------------------------------- diff --git a/conf/examples/1GB/native-standalone/generic_logger.xml b/conf/examples/1GB/native-standalone/generic_logger.xml index 5dc38ac..565bc82 100644 --- a/conf/examples/1GB/native-standalone/generic_logger.xml +++ b/conf/examples/1GB/native-standalone/generic_logger.xml @@ -43,7 +43,7 @@ <!-- Send all logging data to a centralized logger --> <appender name="N1" class="org.apache.log4j.net.SocketAppender"> <param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/> - <param name="port" value="4560"/> + <param name="port" value="${org.apache.accumulo.core.host.log.port}"/> <param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/> <param name="Threshold" value="WARN"/> </appender> http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/1GB/standalone/generic_logger.xml ---------------------------------------------------------------------- diff --git a/conf/examples/1GB/standalone/generic_logger.xml b/conf/examples/1GB/standalone/generic_logger.xml index 5dc38ac..565bc82 100644 --- a/conf/examples/1GB/standalone/generic_logger.xml +++ b/conf/examples/1GB/standalone/generic_logger.xml @@ -43,7 +43,7 @@ <!-- Send all logging data to a centralized logger --> <appender name="N1" class="org.apache.log4j.net.SocketAppender"> <param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/> - <param name="port" value="4560"/> + <param name="port" value="${org.apache.accumulo.core.host.log.port}"/> <param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/> <param name="Threshold" value="WARN"/> </appender> http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/2GB/native-standalone/generic_logger.xml ---------------------------------------------------------------------- diff --git a/conf/examples/2GB/native-standalone/generic_logger.xml b/conf/examples/2GB/native-standalone/generic_logger.xml index 5dc38ac..565bc82 100644 --- a/conf/examples/2GB/native-standalone/generic_logger.xml +++ b/conf/examples/2GB/native-standalone/generic_logger.xml @@ -43,7 +43,7 @@ <!-- Send all logging data to a centralized logger --> <appender name="N1" class="org.apache.log4j.net.SocketAppender"> <param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/> - <param name="port" value="4560"/> + <param name="port" value="${org.apache.accumulo.core.host.log.port}"/> <param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/> <param name="Threshold" value="WARN"/> </appender> http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/2GB/standalone/generic_logger.xml ---------------------------------------------------------------------- diff --git a/conf/examples/2GB/standalone/generic_logger.xml b/conf/examples/2GB/standalone/generic_logger.xml index 5dc38ac..565bc82 100644 --- a/conf/examples/2GB/standalone/generic_logger.xml +++ b/conf/examples/2GB/standalone/generic_logger.xml @@ -43,7 +43,7 @@ <!-- Send all logging data to a centralized logger --> <appender name="N1" class="org.apache.log4j.net.SocketAppender"> <param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/> - <param name="port" value="4560"/> + <param name="port" value="${org.apache.accumulo.core.host.log.port}"/> <param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/> <param name="Threshold" value="WARN"/> </appender> http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/3GB/native-standalone/generic_logger.xml ---------------------------------------------------------------------- diff --git a/conf/examples/3GB/native-standalone/generic_logger.xml b/conf/examples/3GB/native-standalone/generic_logger.xml index 5dc38ac..565bc82 100644 --- a/conf/examples/3GB/native-standalone/generic_logger.xml +++ b/conf/examples/3GB/native-standalone/generic_logger.xml @@ -43,7 +43,7 @@ <!-- Send all logging data to a centralized logger --> <appender name="N1" class="org.apache.log4j.net.SocketAppender"> <param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/> - <param name="port" value="4560"/> + <param name="port" value="${org.apache.accumulo.core.host.log.port}"/> <param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/> <param name="Threshold" value="WARN"/> </appender> http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/3GB/standalone/generic_logger.xml ---------------------------------------------------------------------- diff --git a/conf/examples/3GB/standalone/generic_logger.xml b/conf/examples/3GB/standalone/generic_logger.xml index 5dc38ac..565bc82 100644 --- a/conf/examples/3GB/standalone/generic_logger.xml +++ b/conf/examples/3GB/standalone/generic_logger.xml @@ -43,7 +43,7 @@ <!-- Send all logging data to a centralized logger --> <appender name="N1" class="org.apache.log4j.net.SocketAppender"> <param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/> - <param name="port" value="4560"/> + <param name="port" value="${org.apache.accumulo.core.host.log.port}"/> <param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/> <param name="Threshold" value="WARN"/> </appender> http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/512MB/native-standalone/generic_logger.xml ---------------------------------------------------------------------- diff --git a/conf/examples/512MB/native-standalone/generic_logger.xml b/conf/examples/512MB/native-standalone/generic_logger.xml index 5dc38ac..565bc82 100644 --- a/conf/examples/512MB/native-standalone/generic_logger.xml +++ b/conf/examples/512MB/native-standalone/generic_logger.xml @@ -43,7 +43,7 @@ <!-- Send all logging data to a centralized logger --> <appender name="N1" class="org.apache.log4j.net.SocketAppender"> <param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/> - <param name="port" value="4560"/> + <param name="port" value="${org.apache.accumulo.core.host.log.port}"/> <param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/> <param name="Threshold" value="WARN"/> </appender> http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/512MB/standalone/generic_logger.xml ---------------------------------------------------------------------- diff --git a/conf/examples/512MB/standalone/generic_logger.xml b/conf/examples/512MB/standalone/generic_logger.xml index 5dc38ac..565bc82 100644 --- a/conf/examples/512MB/standalone/generic_logger.xml +++ b/conf/examples/512MB/standalone/generic_logger.xml @@ -43,7 +43,7 @@ <!-- Send all logging data to a centralized logger --> <appender name="N1" class="org.apache.log4j.net.SocketAppender"> <param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/> - <param name="port" value="4560"/> + <param name="port" value="${org.apache.accumulo.core.host.log.port}"/> <param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/> <param name="Threshold" value="WARN"/> </appender> http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/core/src/main/java/org/apache/accumulo/core/Constants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 17faff6..9bb3419 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -63,6 +63,9 @@ public class Constants { public static final String ZGC = "/gc"; public static final String ZGC_LOCK = ZGC + "/lock"; + public static final String ZMONITOR = "/monitor"; + public static final String ZMONITOR_LOG4J_PORT = ZMONITOR + "/log4j_port"; + public static final String ZCONFIG = "/config"; public static final String ZTSERVERS = "/tservers"; http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java index 81cc546..2a22c54 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java @@ -57,6 +57,9 @@ public class MasterClient { } String master = locations.get(0); + if (master.endsWith(":0")) + return null; + int portHint = instance.getConfiguration().getPort(Property.MASTER_CLIENTPORT); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java index fb2f3d8..8772a83 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java @@ -334,6 +334,11 @@ public class ZooLock implements Watcher { return lock != null; } + public synchronized void replaceLockData(byte[] b) throws KeeperException, InterruptedException { + if (getLockPath()!=null) + zooKeeper.getZooKeeper().setData(getLockPath(), b, -1); + } + @Override public synchronized void process(WatchedEvent event) { log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java index 3b4d8a3..01115ec 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java @@ -171,6 +171,15 @@ public class MiniAccumuloCluster { if (!siteConfig.containsKey(key)) fileWriter.append("<property><name>" + key + "</name><value>" + value + "</value></property>\n"); } + + /** + * Sets a given key with a random port for the value on the site config if it doesn't already exist. + */ + private void mergePropWithRandomPort(Map<String,String> siteConfig, String key) { + if (!siteConfig.containsKey(key)) { + siteConfig.put(key, "0"); + } + } /** * @@ -228,8 +237,6 @@ public class MiniAccumuloCluster { appendProp(fileWriter, Property.INSTANCE_DFS_DIR, accumuloDir.getAbsolutePath(), siteConfig); appendProp(fileWriter, Property.INSTANCE_ZK_HOST, "localhost:" + zooKeeperPort, siteConfig); appendProp(fileWriter, Property.INSTANCE_SECRET, INSTANCE_SECRET, siteConfig); - appendProp(fileWriter, Property.MASTER_CLIENTPORT, "" + PortUtils.getRandomFreePort(), siteConfig); - appendProp(fileWriter, Property.TSERV_CLIENTPORT, "" + PortUtils.getRandomFreePort(), siteConfig); appendProp(fileWriter, Property.TSERV_PORTSEARCH, "true", siteConfig); appendProp(fileWriter, Property.LOGGER_DIR, walogDir.getAbsolutePath(), siteConfig); appendProp(fileWriter, Property.TSERV_DATACACHE_SIZE, "10M", siteConfig); @@ -238,8 +245,12 @@ public class MiniAccumuloCluster { appendProp(fileWriter, Property.TSERV_WALOG_MAX_SIZE, "100M", siteConfig); appendProp(fileWriter, Property.TSERV_NATIVEMAP_ENABLED, "false", siteConfig); appendProp(fileWriter, Property.TRACE_TOKEN_PROPERTY_PREFIX + ".password", config.getRootPassword(), siteConfig); - appendProp(fileWriter, Property.TRACE_PORT, "" + PortUtils.getRandomFreePort(), siteConfig); appendProp(fileWriter, Property.GC_CYCLE_DELAY, "30s", siteConfig); + mergePropWithRandomPort(siteConfig, Property.MASTER_CLIENTPORT.getKey()); + mergePropWithRandomPort(siteConfig, Property.TRACE_PORT.getKey()); + mergePropWithRandomPort(siteConfig, Property.TSERV_CLIENTPORT.getKey()); + mergePropWithRandomPort(siteConfig, Property.MONITOR_PORT.getKey()); + mergePropWithRandomPort(siteConfig, Property.GC_PORT.getKey()); // since there is a small amount of memory, check more frequently for majc... setting may not be needed in 1.5 appendProp(fileWriter, Property.TSERV_MAJC_DELAY, "3", siteConfig); http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/Accumulo.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/src/main/java/org/apache/accumulo/server/Accumulo.java index f8ca31a..33bb871 100644 --- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java +++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.Version; +import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.util.time.SimpleTimer; @@ -39,10 +40,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.log4j.helpers.FileWatchdog; import org.apache.log4j.helpers.LogLog; import org.apache.log4j.xml.DOMConfigurator; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; public class Accumulo { @@ -82,6 +87,57 @@ public class Accumulo { } } + private static class LogMonitor extends FileWatchdog implements Watcher { + String path; + + protected LogMonitor(String instance, String filename, int delay) { + super(filename); + setDelay(delay); + this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT; + } + + private void setMonitorPort() { + try { + String port = new String(ZooReaderWriter.getInstance().getData(path, null)); + System.setProperty("org.apache.accumulo.core.host.log.port", port); + log.info("Changing monitor log4j port to "+port); + doOnChange(); + } catch (Exception e) { + log.error("Error reading zookeeper data for monitor log4j port", e); + } + } + + @Override + public void run() { + try { + if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null) + setMonitorPort(); + log.info("Set watch for monitor log4j port"); + } catch (Exception e) { + log.error("Unable to set watch for monitor log4j port " + path); + } + super.run(); + } + + @Override + protected void doOnChange() { + LogManager.resetConfiguration(); + new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository()); + } + + @Override + public void process(WatchedEvent event) { + setMonitorPort(); + if (event.getPath() != null) { + try { + ZooReaderWriter.getInstance().exists(event.getPath(), this); + } catch (Exception ex) { + log.error("Unable to reset watch for monitor log4j port", ex); + } + } + } + } + public static void init(FileSystem fs, ServerConfiguration config, String application) throws UnknownHostException { System.setProperty("org.apache.accumulo.core.application", application); @@ -99,6 +155,9 @@ public class Accumulo { else System.setProperty("org.apache.accumulo.core.host.log", localhost); + int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT); + System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort)); + // Use a specific log config, if it exists String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application); if (!new File(logConfig).exists()) { @@ -109,7 +168,10 @@ public class Accumulo { LogLog.setQuietMode(true); // Configure logging - DOMConfigurator.configureAndWatch(logConfig, 5000); + if (logPort==0) + new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start(); + else + DOMConfigurator.configureAndWatch(logConfig, 5000); log.info(application + " starting"); log.info("Instance " + config.getInstance().getInstanceID()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java index 4278f5b..af8e308 100644 --- a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java +++ b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java @@ -418,7 +418,7 @@ public class SimpleGarbageCollector implements Iface { int port = instance.getConfiguration().getPort(Property.GC_PORT); long maxMessageSize = instance.getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); try { - TServerUtils.startTServer(port, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize); + port = TServerUtils.startTServer(port, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize).port; } catch (Exception ex) { log.fatal(ex, ex); throw new RuntimeException(ex); http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/master/Master.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java index 779673a..5cb928b 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/Master.java +++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java @@ -149,6 +149,7 @@ import org.apache.accumulo.server.util.Halt; import org.apache.accumulo.server.util.MetadataTable; import org.apache.accumulo.server.util.SystemPropUtil; import org.apache.accumulo.server.util.TServerUtils; +import org.apache.accumulo.server.util.TServerUtils.ServerPort; import org.apache.accumulo.server.util.TablePropUtil; import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException; import org.apache.accumulo.server.util.time.SimpleTimer; @@ -2114,8 +2115,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler())); - clientService = TServerUtils.startServer(getSystemConfiguration(), Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, - Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE).server; + ServerPort serverPort = TServerUtils.startServer(getSystemConfiguration(), Property.MASTER_CLIENTPORT, processor, "Master", + "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + clientService = serverPort.server; + InetSocketAddress sock = org.apache.accumulo.core.util.AddressUtil.parseAddress(hostname, serverPort.port); + String address = org.apache.accumulo.core.util.AddressUtil.toString(sock); + log.info("Setting master lock data to " + address); + masterLock.replaceLockData(address.getBytes()); while (!clientService.isServing()) { UtilWaitThread.sleep(100); http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java index a123e9f..ce5dab8 100644 --- a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java +++ b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java @@ -24,9 +24,13 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.log4j.net.SocketNode; @@ -55,6 +59,10 @@ public class LogService extends org.apache.log4j.AppenderSkeleton { } } + public int getLocalPort() { + return server.getLocalPort(); + } + public void run() { try { while (true) { @@ -70,9 +78,12 @@ public class LogService extends org.apache.log4j.AppenderSkeleton { } } - static void startLogListener(AccumuloConfiguration conf) { + static void startLogListener(AccumuloConfiguration conf, String instanceId) { try { - new Daemon(new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT))).start(); + SocketServer server = new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT)); + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_PORT, + Integer.toString(server.getLocalPort()).getBytes(), NodeExistsPolicy.OVERWRITE); + new Daemon(server).start(); } catch (Throwable t) { log.info("Unable to listen to cluster-wide ports", t); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java index 3904088..c373610 100644 --- a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java +++ b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java @@ -50,6 +50,7 @@ import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.Accumulo; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; @@ -478,9 +479,18 @@ public class Monitor { server.addServlet(ShowTrace.class, "/trace/show"); if (server.isUsingSsl()) server.addServlet(ShellServlet.class, "/shell"); - LogService.startLogListener(Monitor.getSystemConfiguration()); server.start(); + try { + String monitorAddress = org.apache.accumulo.core.util.AddressUtil.toString(new InetSocketAddress(hostname, server.getPort())); + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, monitorAddress.getBytes(), + NodeExistsPolicy.OVERWRITE); + log.info("Set monitor address in zookeeper to " + monitorAddress); + } catch (Exception ex) { + log.error("Unable to set monitor address in zookeeper"); + } + LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID()); + new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()), "ZooKeeperStatus").start(); // need to regularly fetch data so plot data is updated http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/util/Info.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/util/Info.java b/server/src/main/java/org/apache/accumulo/server/util/Info.java new file mode 100644 index 0000000..4f03d82 --- /dev/null +++ b/server/src/main/java/org/apache/accumulo/server/util/Info.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; + +public class Info { + public static void main(String[] args) throws Exception { + ZooReaderWriter zrw = ZooReaderWriter.getInstance(); + Instance instance = HdfsZooInstance.getInstance(); + System.out.println("monitor: " + new String(zrw.getData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, null))); + System.out.println("masters: " + instance.getMasterLocations()); + System.out.println("zookeepers: " + instance.getZooKeepers()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java b/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java new file mode 100644 index 0000000..4154d9d --- /dev/null +++ b/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.accumulo.server.util; + +import org.apache.log4j.Logger; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TTransportException; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +/** + * Wrapper around ServerSocketChannel. + * + * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9. + * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket. + */ +public class TNonblockingServerSocket extends TNonblockingServerTransport { + private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName()); + + /** + * This channel is where all the nonblocking magic happens. + */ + private ServerSocketChannel serverSocketChannel = null; + + /** + * Underlying ServerSocket object + */ + private ServerSocket serverSocket_ = null; + + /** + * Timeout for client sockets from accept + */ + private int clientTimeout_ = 0; + + /** + * Creates just a port listening server socket + */ + public TNonblockingServerSocket(int port) throws TTransportException { + this(port, 0); + } + + /** + * Creates just a port listening server socket + */ + public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException { + this(new InetSocketAddress(port), clientTimeout); + } + + public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException { + this(bindAddr, 0); + } + + public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException { + clientTimeout_ = clientTimeout; + try { + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(false); + + // Make server socket + serverSocket_ = serverSocketChannel.socket(); + // Prevent 2MSL delay problem on server restarts + serverSocket_.setReuseAddress(true); + // Bind to listening port + serverSocket_.bind(bindAddr); + } catch (IOException ioe) { + serverSocket_ = null; + throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + "."); + } + } + + public void listen() throws TTransportException { + // Make sure not to block on accept + if (serverSocket_ != null) { + try { + serverSocket_.setSoTimeout(0); + } catch (SocketException sx) { + sx.printStackTrace(); + } + } + } + + protected TNonblockingSocket acceptImpl() throws TTransportException { + if (serverSocket_ == null) { + throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); + } + try { + SocketChannel socketChannel = serverSocketChannel.accept(); + if (socketChannel == null) { + return null; + } + + TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel); + tsocket.setTimeout(clientTimeout_); + return tsocket; + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + + public void registerSelector(Selector selector) { + try { + // Register the server socket channel, indicating an interest in + // accepting new connections + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + } catch (ClosedChannelException e) { + // this shouldn't happen, ideally... + // TODO: decide what to do with this. + } + } + + public void close() { + if (serverSocket_ != null) { + try { + serverSocket_.close(); + } catch (IOException iox) { + log.warn("WARNING: Could not close server socket: " + iox.getMessage()); + } + serverSocket_ = null; + } + } + + public void interrupt() { + // The thread-safeness of this is dubious, but Java documentation suggests + // that it is safe to do this from a different thread context + close(); + } + + public int getPort() { + return serverSocket_.getLocalPort(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java index be14023..ef25513 100644 --- a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java +++ b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java @@ -44,7 +44,6 @@ import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; -import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingSocket; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransport; @@ -111,7 +110,7 @@ public class TServerUtils { for (int i = 0; i < portsToSearch; i++) { int port = portHint + i; - if (portHint == 0) + if (portHint != 0 && i > 0) port = 1024 + random.nextInt(65535 - 1024); if (port > 65535) port = 1024 + port % (65535 - 1024); @@ -213,6 +212,9 @@ public class TServerUtils { public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { TNonblockingServerSocket transport = new TNonblockingServerSocket(port); + if (port == 0) { + port = transport.getPort(); + } THsHaServer.Args options = new THsHaServer.Args(transport); options.protocolFactory(ThriftUtil.protocolFactory()); options.transportFactory(ThriftUtil.transportFactory(maxMessageSize)); http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/start/src/main/java/org/apache/accumulo/start/Main.java ---------------------------------------------------------------------- diff --git a/start/src/main/java/org/apache/accumulo/start/Main.java b/start/src/main/java/org/apache/accumulo/start/Main.java index a93a9d8..cbc90c6 100644 --- a/start/src/main/java/org/apache/accumulo/start/Main.java +++ b/start/src/main/java/org/apache/accumulo/start/Main.java @@ -72,9 +72,11 @@ public class Main { } else if (args[0].equals("rfile-info")) { runTMP = cl.loadClass("org.apache.accumulo.core.file.rfile.PrintInfo"); } else if (args[0].equals("login-info")) { - runTMP = cl.loadClass("org.apache.accumulo.core.util.LoginProperties"); + runTMP = cl.loadClass("org.apache.accumulo.server.util.LoginProperties"); } else if (args[0].equals("zookeeper")) { runTMP = cl.loadClass("org.apache.accumulo.server.util.ZooKeeperMain"); + } else if (args[0].equals("info")) { + runTMP = cl.loadClass("org.apache.accumulo.server.util.Info"); } else { try { runTMP = cl.loadClass(args[0]); @@ -118,6 +120,6 @@ public class Main { } private static void printUsage() { - System.out.println("accumulo init | master | tserver | monitor | shell | admin | gc | classpath | rfile-info | login-info | tracer | proxy | zookeeper | <accumulo class> args"); + System.out.println("accumulo init | master | tserver | monitor | shell | admin | gc | classpath | rfile-info | login-info | tracer | proxy | zookeeper | info | version | <accumulo class> args"); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java b/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java index 8a2a6ef..0c57250 100644 --- a/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java +++ b/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java @@ -328,6 +328,24 @@ public class ZooLockTest { zl.unlock(); } + @Test(timeout = 10000) + public void testChangeData() throws Exception { + String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++; + ZooKeeper zk = new ZooKeeper(accumulo.getZooKeepers(), 1000, null); + zk.addAuthInfo("digest", "secret".getBytes()); + zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + ZooLock zl = new ZooLock(accumulo.getZooKeepers(), 1000, "digest", "secret".getBytes(), parent); + + TestALW lw = new TestALW(); + + zl.lockAsync(lw, "test1".getBytes()); + Assert.assertEquals("test1", new String(zk.getData(zl.getLockPath(), null, null))); + + zl.replaceLockData("test2".getBytes()); + Assert.assertEquals("test2", new String(zk.getData(zl.getLockPath(), null, null))); + } + @AfterClass public static void tearDownMiniCluster() throws Exception { accumulo.stop();