Updated Branches:
  refs/heads/1.6.0-SNAPSHOT 7f37d96c3 -> 2610e821b

ACCUMULO-1999 Backport changes from 1664 which advertise the Master's random 
port


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/622fc7d2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/622fc7d2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/622fc7d2

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 622fc7d20744ae8a7e75ba0dc945777f94254b48
Parents: c2cd051
Author: Josh Elser <els...@apache.org>
Authored: Tue Dec 10 14:55:18 2013 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Thu Dec 12 15:38:15 2013 -0500

----------------------------------------------------------------------
 .../1GB/native-standalone/generic_logger.xml    |   2 +-
 conf/examples/1GB/standalone/generic_logger.xml |   2 +-
 .../2GB/native-standalone/generic_logger.xml    |   2 +-
 conf/examples/2GB/standalone/generic_logger.xml |   2 +-
 .../3GB/native-standalone/generic_logger.xml    |   2 +-
 conf/examples/3GB/standalone/generic_logger.xml |   2 +-
 .../512MB/native-standalone/generic_logger.xml  |   2 +-
 .../512MB/standalone/generic_logger.xml         |   2 +-
 .../org/apache/accumulo/core/Constants.java     |   3 +
 .../accumulo/core/client/impl/MasterClient.java |   3 +
 .../apache/accumulo/fate/zookeeper/ZooLock.java |   5 +
 .../minicluster/MiniAccumuloCluster.java        |  17 +-
 .../org/apache/accumulo/server/Accumulo.java    |  64 +++++++-
 .../server/gc/SimpleGarbageCollector.java       |   2 +-
 .../apache/accumulo/server/master/Master.java   |  10 +-
 .../accumulo/server/monitor/LogService.java     |  15 +-
 .../apache/accumulo/server/monitor/Monitor.java |  12 +-
 .../org/apache/accumulo/server/util/Info.java   |  33 ++++
 .../server/util/TNonblockingServerSocket.java   | 157 +++++++++++++++++++
 .../accumulo/server/util/TServerUtils.java      |   6 +-
 .../java/org/apache/accumulo/start/Main.java    |   6 +-
 .../accumulo/fate/zookeeper/ZooLockTest.java    |  18 +++
 22 files changed, 345 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/1GB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/1GB/native-standalone/generic_logger.xml 
b/conf/examples/1GB/native-standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/1GB/native-standalone/generic_logger.xml
+++ b/conf/examples/1GB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     
value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           
value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    
value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/1GB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/1GB/standalone/generic_logger.xml 
b/conf/examples/1GB/standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/1GB/standalone/generic_logger.xml
+++ b/conf/examples/1GB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     
value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           
value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    
value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/2GB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/2GB/native-standalone/generic_logger.xml 
b/conf/examples/2GB/native-standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/2GB/native-standalone/generic_logger.xml
+++ b/conf/examples/2GB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     
value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           
value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    
value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/2GB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/2GB/standalone/generic_logger.xml 
b/conf/examples/2GB/standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/2GB/standalone/generic_logger.xml
+++ b/conf/examples/2GB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     
value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           
value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    
value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/3GB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/3GB/native-standalone/generic_logger.xml 
b/conf/examples/3GB/native-standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/3GB/native-standalone/generic_logger.xml
+++ b/conf/examples/3GB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     
value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           
value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    
value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/3GB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/3GB/standalone/generic_logger.xml 
b/conf/examples/3GB/standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/3GB/standalone/generic_logger.xml
+++ b/conf/examples/3GB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     
value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           
value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    
value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/512MB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/512MB/native-standalone/generic_logger.xml 
b/conf/examples/512MB/native-standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/512MB/native-standalone/generic_logger.xml
+++ b/conf/examples/512MB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     
value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           
value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    
value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/512MB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/512MB/standalone/generic_logger.xml 
b/conf/examples/512MB/standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/512MB/standalone/generic_logger.xml
+++ b/conf/examples/512MB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     
value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           
value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    
value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java 
b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 17faff6..9bb3419 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -63,6 +63,9 @@ public class Constants {
   public static final String ZGC = "/gc";
   public static final String ZGC_LOCK = ZGC + "/lock";
   
+  public static final String ZMONITOR = "/monitor";
+  public static final String ZMONITOR_LOG4J_PORT = ZMONITOR + "/log4j_port";
+  
   public static final String ZCONFIG = "/config";
   
   public static final String ZTSERVERS = "/tservers";

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java 
b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
index 81cc546..2a22c54 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
@@ -57,6 +57,9 @@ public class MasterClient {
     }
     
     String master = locations.get(0);
+    if (master.endsWith(":0"))
+      return null;
+    
     int portHint = 
instance.getConfiguration().getPort(Property.MASTER_CLIENTPORT);
     
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java 
b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
index fb2f3d8..8772a83 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
@@ -334,6 +334,11 @@ public class ZooLock implements Watcher {
     return lock != null;
   }
   
+  public synchronized void replaceLockData(byte[] b) throws KeeperException, 
InterruptedException {
+    if (getLockPath()!=null)
+      zooKeeper.getZooKeeper().setData(getLockPath(), b, -1);
+  }
+  
   @Override
   public synchronized void process(WatchedEvent event) {
     log.debug("event " + event.getPath() + " " + event.getType() + " " + 
event.getState());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
----------------------------------------------------------------------
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
 
b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
index 3b4d8a3..01115ec 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
@@ -171,6 +171,15 @@ public class MiniAccumuloCluster {
     if (!siteConfig.containsKey(key))
       fileWriter.append("<property><name>" + key + "</name><value>" + value + 
"</value></property>\n");
   }
+
+  /**
+   * Sets a given key with a random port for the value on the site config if 
it doesn't already exist.
+   */
+  private void mergePropWithRandomPort(Map<String,String> siteConfig, String 
key) {
+    if (!siteConfig.containsKey(key)) {
+      siteConfig.put(key, "0");
+    }
+  }
   
   /**
    * 
@@ -228,8 +237,6 @@ public class MiniAccumuloCluster {
     appendProp(fileWriter, Property.INSTANCE_DFS_DIR, 
accumuloDir.getAbsolutePath(), siteConfig);
     appendProp(fileWriter, Property.INSTANCE_ZK_HOST, "localhost:" + 
zooKeeperPort, siteConfig);
     appendProp(fileWriter, Property.INSTANCE_SECRET, INSTANCE_SECRET, 
siteConfig);
-    appendProp(fileWriter, Property.MASTER_CLIENTPORT, "" + 
PortUtils.getRandomFreePort(), siteConfig);
-    appendProp(fileWriter, Property.TSERV_CLIENTPORT, "" + 
PortUtils.getRandomFreePort(), siteConfig);
     appendProp(fileWriter, Property.TSERV_PORTSEARCH, "true", siteConfig);
     appendProp(fileWriter, Property.LOGGER_DIR, walogDir.getAbsolutePath(), 
siteConfig);
     appendProp(fileWriter, Property.TSERV_DATACACHE_SIZE, "10M", siteConfig);
@@ -238,8 +245,12 @@ public class MiniAccumuloCluster {
     appendProp(fileWriter, Property.TSERV_WALOG_MAX_SIZE, "100M", siteConfig);
     appendProp(fileWriter, Property.TSERV_NATIVEMAP_ENABLED, "false", 
siteConfig);
     appendProp(fileWriter, Property.TRACE_TOKEN_PROPERTY_PREFIX + ".password", 
config.getRootPassword(), siteConfig);
-    appendProp(fileWriter, Property.TRACE_PORT, "" + 
PortUtils.getRandomFreePort(), siteConfig);
     appendProp(fileWriter, Property.GC_CYCLE_DELAY, "30s", siteConfig);
+    mergePropWithRandomPort(siteConfig, Property.MASTER_CLIENTPORT.getKey());
+    mergePropWithRandomPort(siteConfig, Property.TRACE_PORT.getKey());
+    mergePropWithRandomPort(siteConfig, Property.TSERV_CLIENTPORT.getKey());
+    mergePropWithRandomPort(siteConfig, Property.MONITOR_PORT.getKey());
+    mergePropWithRandomPort(siteConfig, Property.GC_PORT.getKey());
     
     // since there is a small amount of memory, check more frequently for 
majc... setting may not be needed in 1.5
     appendProp(fileWriter, Property.TSERV_MAJC_DELAY, "3", siteConfig);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/Accumulo.java 
b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
index f8ca31a..33bb871 100644
--- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.trace.DistributedTrace;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.Version;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.util.time.SimpleTimer;
@@ -39,10 +40,14 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.FileWatchdog;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.xml.DOMConfigurator;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 
 public class Accumulo {
   
@@ -82,6 +87,57 @@ public class Accumulo {
     }
   }
   
+  private static class LogMonitor extends FileWatchdog implements Watcher {
+    String path;
+    
+    protected LogMonitor(String instance, String filename, int delay) {
+      super(filename);
+      setDelay(delay);
+      this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
+    }
+    
+    private void setMonitorPort() {
+      try {
+        String port = new String(ZooReaderWriter.getInstance().getData(path, 
null));
+        System.setProperty("org.apache.accumulo.core.host.log.port", port);
+        log.info("Changing monitor log4j port to "+port);
+        doOnChange();
+      } catch (Exception e) {
+        log.error("Error reading zookeeper data for monitor log4j port", e);
+      }
+    }
+    
+    @Override
+    public void run() {
+      try {
+        if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != 
null)
+          setMonitorPort();
+        log.info("Set watch for monitor log4j port");
+      } catch (Exception e) {
+        log.error("Unable to set watch for monitor log4j port " + path);
+      }
+      super.run();
+    }
+    
+    @Override
+    protected void doOnChange() {
+      LogManager.resetConfiguration();
+      new DOMConfigurator().doConfigure(filename, 
LogManager.getLoggerRepository());
+    }
+    
+    @Override
+    public void process(WatchedEvent event) {
+      setMonitorPort();
+      if (event.getPath() != null) {
+        try {
+          ZooReaderWriter.getInstance().exists(event.getPath(), this);
+        } catch (Exception ex) {
+          log.error("Unable to reset watch for monitor log4j port", ex);
+        }
+      }
+    }
+  }
+  
   public static void init(FileSystem fs, ServerConfiguration config, String 
application) throws UnknownHostException {
     
     System.setProperty("org.apache.accumulo.core.application", application);
@@ -99,6 +155,9 @@ public class Accumulo {
     else
       System.setProperty("org.apache.accumulo.core.host.log", localhost);
     
+    int logPort = 
config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
+    System.setProperty("org.apache.accumulo.core.host.log.port", 
Integer.toString(logPort));
+    
     // Use a specific log config, if it exists
     String logConfig = String.format("%s/%s_logger.xml", 
System.getenv("ACCUMULO_CONF_DIR"), application);
     if (!new File(logConfig).exists()) {
@@ -109,7 +168,10 @@ public class Accumulo {
     LogLog.setQuietMode(true);
     
     // Configure logging
-    DOMConfigurator.configureAndWatch(logConfig, 5000);
+    if (logPort==0)
+      new LogMonitor(config.getInstance().getInstanceID(), logConfig, 
5000).start();
+    else
+      DOMConfigurator.configureAndWatch(logConfig, 5000);
     
     log.info(application + " starting");
     log.info("Instance " + config.getInstance().getInstanceID());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
 
b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
index 4278f5b..af8e308 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
@@ -418,7 +418,7 @@ public class SimpleGarbageCollector implements Iface {
     int port = instance.getConfiguration().getPort(Property.GC_PORT);
     long maxMessageSize = 
instance.getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
     try {
-      TServerUtils.startTServer(port, processor, 
this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize);
+      port = TServerUtils.startTServer(port, processor, 
this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, 
maxMessageSize).port;
     } catch (Exception ex) {
       log.fatal(ex, ex);
       throw new RuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java 
b/server/src/main/java/org/apache/accumulo/server/master/Master.java
index 779673a..5cb928b 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@ -149,6 +149,7 @@ import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.util.SystemPropUtil;
 import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerPort;
 import org.apache.accumulo.server.util.TablePropUtil;
 import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException;
 import org.apache.accumulo.server.util.time.SimpleTimer;
@@ -2114,8 +2115,13 @@ public class Master implements LiveTServerSet.Listener, 
TableObserver, CurrentSt
     }
     
     Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new 
MasterClientServiceHandler()));
-    clientService = TServerUtils.startServer(getSystemConfiguration(), 
Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service 
Handler", null,
-        Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE).server;
+    ServerPort serverPort = TServerUtils.startServer(getSystemConfiguration(), 
Property.MASTER_CLIENTPORT, processor, "Master",
+        "Master Client Service Handler", null, Property.MASTER_MINTHREADS, 
Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
+    clientService = serverPort.server;
+    InetSocketAddress sock = 
org.apache.accumulo.core.util.AddressUtil.parseAddress(hostname, 
serverPort.port);
+    String address = org.apache.accumulo.core.util.AddressUtil.toString(sock);
+    log.info("Setting master lock data to " + address);
+    masterLock.replaceLockData(address.getBytes());
     
     while (!clientService.isServing()) {
       UtilWaitThread.sleep(100);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java 
b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
index a123e9f..ce5dab8 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
@@ -24,9 +24,13 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.net.SocketNode;
@@ -55,6 +59,10 @@ public class LogService extends 
org.apache.log4j.AppenderSkeleton {
       }
     }
     
+    public int getLocalPort() {
+      return server.getLocalPort();
+    }
+    
     public void run() {
       try {
         while (true) {
@@ -70,9 +78,12 @@ public class LogService extends 
org.apache.log4j.AppenderSkeleton {
     }
   }
   
-  static void startLogListener(AccumuloConfiguration conf) {
+  static void startLogListener(AccumuloConfiguration conf, String instanceId) {
     try {
-      new Daemon(new 
SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT))).start();
+      SocketServer server = new 
SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT));
+      
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instanceId) + 
Constants.ZMONITOR_LOG4J_PORT,
+          Integer.toString(server.getLocalPort()).getBytes(), 
NodeExistsPolicy.OVERWRITE);
+      new Daemon(server).start();
     } catch (Throwable t) {
       log.info("Unable to listen to cluster-wide ports", t);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java 
b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
index 3904088..c373610 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -478,9 +479,18 @@ public class Monitor {
     server.addServlet(ShowTrace.class, "/trace/show");
     if (server.isUsingSsl())
       server.addServlet(ShellServlet.class, "/shell");
-    LogService.startLogListener(Monitor.getSystemConfiguration());
     server.start();
     
+    try {
+      String monitorAddress = 
org.apache.accumulo.core.util.AddressUtil.toString(new 
InetSocketAddress(hostname, server.getPort()));
+      
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + 
Constants.ZMONITOR, monitorAddress.getBytes(),
+          NodeExistsPolicy.OVERWRITE);
+      log.info("Set monitor address in zookeeper to " + monitorAddress);
+    } catch (Exception ex) {
+      log.error("Unable to set monitor address in zookeeper");
+    }
+    LogService.startLogListener(Monitor.getSystemConfiguration(), 
instance.getInstanceID());
+    
     new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()), 
"ZooKeeperStatus").start();
     
     // need to regularly fetch data so plot data is updated

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/util/Info.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/Info.java 
b/server/src/main/java/org/apache/accumulo/server/util/Info.java
new file mode 100644
index 0000000..4f03d82
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/util/Info.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+public class Info {
+  public static void main(String[] args) throws Exception {
+    ZooReaderWriter zrw = ZooReaderWriter.getInstance();
+    Instance instance = HdfsZooInstance.getInstance();
+    System.out.println("monitor: " + new 
String(zrw.getData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, null)));
+    System.out.println("masters: " + instance.getMasterLocations());
+    System.out.println("zookeepers: " + instance.getZooKeepers());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
 
b/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
new file mode 100644
index 0000000..4154d9d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.server.util;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Wrapper around ServerSocketChannel.
+ * 
+ * This class is copied from 
org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
+ * The only change (apart from the logging statements) is the addition of the 
{@link #getPort()} method to retrieve the port used by the ServerSocket.
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+  private static final Logger log = 
Logger.getLogger(TNonblockingServerTransport.class.getName());
+
+  /**
+   * This channel is where all the nonblocking magic happens.
+   */
+  private ServerSocketChannel serverSocketChannel = null;
+
+  /**
+   * Underlying ServerSocket object
+   */
+  private ServerSocket serverSocket_ = null;
+
+  /**
+   * Timeout for client sockets from accept
+   */
+  private int clientTimeout_ = 0;
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port) throws TTransportException {
+    this(port, 0);
+  }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port, int clientTimeout) throws 
TTransportException {
+    this(new InetSocketAddress(port), clientTimeout);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr) throws 
TTransportException {
+    this(bindAddr, 0);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr, int 
clientTimeout) throws TTransportException {
+    clientTimeout_ = clientTimeout;
+    try {
+      serverSocketChannel = ServerSocketChannel.open();
+      serverSocketChannel.configureBlocking(false);
+
+      // Make server socket
+      serverSocket_ = serverSocketChannel.socket();
+      // Prevent 2MSL delay problem on server restarts
+      serverSocket_.setReuseAddress(true);
+      // Bind to listening port
+      serverSocket_.bind(bindAddr);
+    } catch (IOException ioe) {
+      serverSocket_ = null;
+      throw new TTransportException("Could not create ServerSocket on address 
" + bindAddr.toString() + ".");
+    }
+  }
+
+  public void listen() throws TTransportException {
+    // Make sure not to block on accept
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.setSoTimeout(0);
+      } catch (SocketException sx) {
+        sx.printStackTrace();
+      }
+    }
+  }
+
+  protected TNonblockingSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "No 
underlying server socket.");
+    }
+    try {
+      SocketChannel socketChannel = serverSocketChannel.accept();
+      if (socketChannel == null) {
+        return null;
+      }
+
+      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+      tsocket.setTimeout(clientTimeout_);
+      return tsocket;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void registerSelector(Selector selector) {
+    try {
+      // Register the server socket channel, indicating an interest in
+      // accepting new connections
+      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+    } catch (ClosedChannelException e) {
+      // this shouldn't happen, ideally...
+      // TODO: decide what to do with this.
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        log.warn("WARNING: Could not close server socket: " + 
iox.getMessage());
+      }
+      serverSocket_ = null;
+    }
+  }
+
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
+  public int getPort() {
+    return serverSocket_.getLocalPort();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java 
b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index be14023..ef25513 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -44,7 +44,6 @@ import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransport;
@@ -111,7 +110,7 @@ public class TServerUtils {
       
       for (int i = 0; i < portsToSearch; i++) {
         int port = portHint + i;
-        if (portHint == 0)
+        if (portHint != 0 && i > 0)
           port = 1024 + random.nextInt(65535 - 1024);
         if (port > 65535)
           port = 1024 + port % (65535 - 1024);
@@ -213,6 +212,9 @@ public class TServerUtils {
   public static ServerPort startHsHaServer(int port, TProcessor processor, 
final String serverName, String threadName, final int numThreads,
       long timeBetweenThreadChecks, long maxMessageSize) throws 
TTransportException {
     TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
+    if (port == 0) {
+      port = transport.getPort();
+    }
     THsHaServer.Args options = new THsHaServer.Args(transport);
     options.protocolFactory(ThriftUtil.protocolFactory());
     options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/start/src/main/java/org/apache/accumulo/start/Main.java
----------------------------------------------------------------------
diff --git a/start/src/main/java/org/apache/accumulo/start/Main.java 
b/start/src/main/java/org/apache/accumulo/start/Main.java
index a93a9d8..cbc90c6 100644
--- a/start/src/main/java/org/apache/accumulo/start/Main.java
+++ b/start/src/main/java/org/apache/accumulo/start/Main.java
@@ -72,9 +72,11 @@ public class Main {
       } else if (args[0].equals("rfile-info")) {
         runTMP = cl.loadClass("org.apache.accumulo.core.file.rfile.PrintInfo");
       } else if (args[0].equals("login-info")) {
-        runTMP = cl.loadClass("org.apache.accumulo.core.util.LoginProperties");
+        runTMP = 
cl.loadClass("org.apache.accumulo.server.util.LoginProperties");
       } else if (args[0].equals("zookeeper")) {
         runTMP = cl.loadClass("org.apache.accumulo.server.util.ZooKeeperMain");
+      } else if (args[0].equals("info")) {
+        runTMP = cl.loadClass("org.apache.accumulo.server.util.Info");
       } else {
         try {
           runTMP = cl.loadClass(args[0]);
@@ -118,6 +120,6 @@ public class Main {
   }
   
   private static void printUsage() {
-    System.out.println("accumulo init | master | tserver | monitor | shell | 
admin | gc | classpath | rfile-info | login-info | tracer | proxy | zookeeper | 
<accumulo class> args");
+    System.out.println("accumulo init | master | tserver | monitor | shell | 
admin | gc | classpath | rfile-info | login-info | tracer | proxy | zookeeper | 
info | version | <accumulo class> args");
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java 
b/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
index 8a2a6ef..0c57250 100644
--- a/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
+++ b/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
@@ -328,6 +328,24 @@ public class ZooLockTest {
     zl.unlock();
   }
   
+  @Test(timeout = 10000)
+  public void testChangeData() throws Exception {
+    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+    ZooKeeper zk = new ZooKeeper(accumulo.getZooKeepers(), 1000, null);
+    zk.addAuthInfo("digest", "secret".getBytes());
+    zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+    
+    ZooLock zl = new ZooLock(accumulo.getZooKeepers(), 1000, "digest", 
"secret".getBytes(), parent);
+    
+    TestALW lw = new TestALW();
+    
+    zl.lockAsync(lw, "test1".getBytes());
+    Assert.assertEquals("test1", new String(zk.getData(zl.getLockPath(), null, 
null)));
+    
+    zl.replaceLockData("test2".getBytes());
+    Assert.assertEquals("test2", new String(zk.getData(zl.getLockPath(), null, 
null)));
+  }
+  
   @AfterClass
   public static void tearDownMiniCluster() throws Exception {
     accumulo.stop();

Reply via email to