Author: sseth
Date: Mon Mar  9 18:09:46 2015
New Revision: 1665314

URL: http://svn.apache.org/r1665314
Log:
HIVE-9775. LLAP: Add a MiniLLAPCluster for tests. (Siddharth Seth)

Added:
    
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
Modified:
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
    
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java?rev=1665314&r1=1665313&r2=1665314&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
 Mon Mar  9 18:09:46 2015
@@ -17,11 +17,14 @@ package org.apache.hadoop.hive.llap.daem
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.ObjectName;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
@@ -33,6 +36,7 @@ import org.apache.hadoop.hive.llap.shuff
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Joiner;
@@ -42,59 +46,72 @@ public class LlapDaemon extends Abstract
 
   private static final Logger LOG = Logger.getLogger(LlapDaemon.class);
 
-  private final LlapDaemonConfiguration daemonConf;
-  private final int numExecutors;
-  private final int rpcPort;
+  private final Configuration shuffleHandlerConf;
   private final LlapDaemonProtocolServerImpl server;
   private final ContainerRunnerImpl containerRunner;
-  private final String[] localDirs;
-  private final int shufflePort;
-  private final long memoryPerInstance;
-  private final long maxJvmMemory;
+  private final AtomicLong numSubmissions = new AtomicLong(0);
   private JvmPauseMonitor pauseMonitor;
   private final ObjectName llapDaemonInfoBean;
   private final LlapDaemonExecutorMetrics metrics;
 
+  // Parameters used for JMX
+  private final boolean llapIoEnabled;
+  private final long executorMemoryPerInstance;
+  private final long ioMemoryPerInstance;
+  private final int numExecutors;
+  private final long maxJvmMemory;
+  private final String[] localDirs;
+
   // TODO Not the best way to share the address
   private final AtomicReference<InetSocketAddress> address = new 
AtomicReference<InetSocketAddress>();
 
-  public LlapDaemon(LlapDaemonConfiguration daemonConf) {
+  public LlapDaemon(Configuration daemonConf, int numExecutors, long 
executorMemoryBytes,
+                    boolean ioEnabled, long ioMemoryBytes, String[] localDirs, 
int rpcPort,
+                    int shufflePort) {
     super("LlapDaemon");
 
     printAsciiArt();
 
-    // TODO This needs to read TezConfiguration to pick up things like the 
heartbeat interval from config.
-    // Ideally, this would be part of llap-daemon-configuration
-    this.numExecutors = 
daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
-        LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
-    this.rpcPort = 
daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT,
-        LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
-    this.daemonConf = daemonConf;
-    this.localDirs = 
daemonConf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_WORK_DIRS);
-    this.shufflePort = 
daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, -1);
-
-    memoryPerInstance = this.daemonConf
-        .getInt(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
-            
LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 
1024l;
-    maxJvmMemory = Runtime.getRuntime().maxMemory();
+    Preconditions.checkArgument(numExecutors > 0);
+    Preconditions.checkArgument(rpcPort == 0 || (rpcPort > 1024 && rpcPort < 
65536),
+        "RPC Port must be between 1025 and 65535, or 0 automatic selection");
+    Preconditions.checkArgument(localDirs != null && localDirs.length > 0,
+        "Work dirs must be specified");
+    Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && 
shufflePort < 65536),
+        "Shuffle Port must be betwee 1024 and 65535, or 0 for automatic 
selection");
+
+    this.maxJvmMemory = Runtime.getRuntime().maxMemory();
+    this.llapIoEnabled = ioEnabled;
+    this.executorMemoryPerInstance = executorMemoryBytes;
+    this.ioMemoryPerInstance = ioMemoryBytes;
+    this.numExecutors = numExecutors;
+    this.localDirs = localDirs;
 
-    LOG.info("LlapDaemon started with the following configuration: " +
+    LOG.info("Attempting to start LlapDaemonConf with the following 
configuration: " +
         "numExecutors=" + numExecutors +
         ", rpcListenerPort=" + rpcPort +
         ", workDirs=" + Arrays.toString(localDirs) +
         ", shufflePort=" + shufflePort +
-        ", memoryConfigured=" + memoryPerInstance +
+        ", executorMemory=" + executorMemoryBytes +
+        ", llapIoEnabled=" + ioEnabled +
+        ", llapIoCacheSize=" + ioMemoryBytes +
         ", jvmAvailableMemory=" + maxJvmMemory);
 
-    Preconditions.checkArgument(this.numExecutors > 0);
-    Preconditions.checkArgument(this.rpcPort > 1024 && this.rpcPort < 65536,
-        "RPC Port must be between 1025 and 65534");
-    Preconditions.checkArgument(this.localDirs != null && 
this.localDirs.length > 0,
-        "Work dirs must be specified");
-    Preconditions.checkArgument(this.shufflePort > 0, "ShufflePort must be 
specified");
-    Preconditions.checkState(maxJvmMemory >= memoryPerInstance,
-        "Invalid configuration. Xmx value too small. maxAvailable=" + 
maxJvmMemory + ", configured=" +
-            memoryPerInstance);
+    long memRequired = executorMemoryBytes + (ioEnabled ? ioMemoryBytes : 0);
+    Preconditions.checkState(maxJvmMemory >= memRequired,
+        "Invalid configuration. Xmx value too small. maxAvailable=" + 
maxJvmMemory +
+            ", configured(exec + io if enabled)=" +
+            memRequired);
+
+    this.shuffleHandlerConf = new Configuration(daemonConf);
+    this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 
shufflePort);
+    this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS,
+        StringUtils.arrayToString(localDirs));
+
+    // Less frequently set parameter, not passing in as a param.
+    int numHandlers = 
daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS,
+        LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT);
+    this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, 
rpcPort);
 
     // Initialize the metric system
     LlapMetricsSystem.initialize("LlapDaemon");
@@ -105,12 +122,11 @@ public class LlapDaemon extends Abstract
     this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, 
numExecutors);
     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
     this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", 
this);
-    LOG.info("Started LlapMetricsSystem with displayName: " + displayName
-        + " sessionId: " + sessionId);
+    LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
+        " sessionId: " + sessionId);
 
-    this.server = new LlapDaemonProtocolServerImpl(daemonConf, this, address);
     this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, 
shufflePort, address,
-        memoryPerInstance, metrics);
+        executorMemoryBytes, metrics);
   }
 
   private void printAsciiArt() {
@@ -136,15 +152,18 @@ public class LlapDaemon extends Abstract
   }
 
   @Override
-  public void serviceStart() {
+  public void serviceStart() throws Exception {
+    ShuffleHandler.initializeAndStart(shuffleHandlerConf);
     server.start();
     containerRunner.start();
   }
 
-  public void serviceStop() {
+  public void serviceStop() throws Exception {
+    // TODO Shutdown LlapIO
     shutdown();
     containerRunner.stop();
     server.stop();
+    ShuffleHandler.shutdown();
   }
 
   public void shutdown() {
@@ -167,14 +186,28 @@ public class LlapDaemon extends Abstract
   public static void main(String[] args) throws Exception {
     LlapDaemon llapDaemon = null;
     try {
+      // Cache settings will need to be setup in llap-daemon-site.xml - since 
the daemons don't read hive-site.xml
+      // Ideally, these properties should be part of LlapDameonConf rather 
than HiveConf
       LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration();
+       int numExecutors = 
daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
+           LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+       String[] localDirs =
+           
daemonConf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_WORK_DIRS);
+       int rpcPort = 
daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT,
+           LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+       int shufflePort = daemonConf
+           .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 
ShuffleHandler.DEFAULT_SHUFFLE_PORT);
+       long executorMemoryBytes = daemonConf
+           .getInt(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
+               
LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 
1024l;
+       long cacheMemoryBytes =
+           HiveConf.getLongVar(daemonConf, 
HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+       boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, 
HiveConf.ConfVars.LLAP_IO_ENABLED);
+       llapDaemon =
+           new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, 
llapIoEnabled,
+               cacheMemoryBytes, localDirs,
+               rpcPort, shufflePort);
 
-      Configuration shuffleHandlerConf = new Configuration(daemonConf);
-      shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS,
-          daemonConf.get(LlapDaemonConfiguration.LLAP_DAEMON_WORK_DIRS));
-      ShuffleHandler.initializeAndStart(shuffleHandlerConf);
-
-      llapDaemon = new LlapDaemon(daemonConf);
       llapDaemon.init(daemonConf);
       llapDaemon.start();
       LOG.info("Started LlapDaemon");
@@ -192,13 +225,23 @@ public class LlapDaemon extends Abstract
   @Override
   public void submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto 
request) throws
       IOException {
+    numSubmissions.incrementAndGet();
     containerRunner.submitWork(request);
   }
 
+  @VisibleForTesting
+  public long getNumSubmissions() {
+    return numSubmissions.get();
+  }
+
+  public InetSocketAddress getListenerAddress() {
+    return server.getBindAddress();
+  }
+
   // LlapDaemonMXBean methods. Will be exposed via JMX
   @Override
   public int getRpcPort() {
-    return rpcPort;
+    return server.getBindAddress().getPort();
   }
 
   @Override
@@ -208,7 +251,7 @@ public class LlapDaemon extends Abstract
 
   @Override
   public int getShufflePort() {
-    return shufflePort;
+    return ShuffleHandler.get().getPort();
   }
 
   @Override
@@ -217,8 +260,18 @@ public class LlapDaemon extends Abstract
   }
 
   @Override
-  public long getMemoryPerInstance() {
-    return memoryPerInstance;
+  public long getExecutorMemoryPerInstance() {
+    return executorMemoryPerInstance;
+  }
+
+  @Override
+  public long getIoMemoryPerInstance() {
+    return ioMemoryPerInstance;
+  }
+
+  @Override
+  public boolean isIoEnabled() {
+    return llapIoEnabled;
   }
 
   @Override

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java?rev=1665314&r1=1665313&r2=1665314&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
 Mon Mar  9 18:09:46 2015
@@ -50,10 +50,22 @@ public interface LlapDaemonMXBean {
   public String getLocalDirs();
 
   /**
-   * Gets llap daemon configured memory per instance.
+   * Gets llap daemon configured executor memory per instance.
    * @return memory per instance
    */
-  public long getMemoryPerInstance();
+  public long getExecutorMemoryPerInstance();
+
+  /**
+   * Gets llap daemon configured io memory per instance.
+   * @return memory per instance
+   */
+  public long getIoMemoryPerInstance();
+
+  /**
+   * Checks if Llap IO is enabled
+   * @return true if enabled, false if not
+   */
+  public boolean isIoEnabled();
 
   /**
    * Gets max available jvm memory.

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java?rev=1665314&r1=1665313&r2=1665314&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
 Mon Mar  9 18:09:46 2015
@@ -33,7 +33,6 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
 
 public class LlapDaemonProtocolServerImpl extends AbstractService
@@ -41,26 +40,31 @@ public class LlapDaemonProtocolServerImp
 
   private static final Log LOG = 
LogFactory.getLog(LlapDaemonProtocolServerImpl.class);
 
-  private final LlapDaemonConfiguration daemonConf;
+  private final int numHandlers;
   private final ContainerRunner containerRunner;
+  private final int configuredPort;
   private RPC.Server server;
   private final AtomicReference<InetSocketAddress> bindAddress;
 
 
-  public LlapDaemonProtocolServerImpl(LlapDaemonConfiguration daemonConf,
+  public LlapDaemonProtocolServerImpl(int numHandlers,
                                       ContainerRunner containerRunner,
-                                      AtomicReference<InetSocketAddress> 
address) {
+                                      AtomicReference<InetSocketAddress> 
address,
+                                      int configuredPort) {
     super("LlapDaemonProtocolServerImpl");
-    this.daemonConf = daemonConf;
+    this.numHandlers = numHandlers;
     this.containerRunner = containerRunner;
     this.bindAddress = address;
+    this.configuredPort = configuredPort;
+    LOG.info("Creating: " + LlapDaemonProtocolServerImpl.class.getSimpleName() 
+
+        " with port configured to: " + configuredPort);
   }
 
   @Override
   public SubmitWorkResponseProto submitWork(RpcController controller,
                                             
LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws
       ServiceException {
-    LOG.info("DEBUG: Recevied request: " + request);
+    LOG.info("DEBUG: Received request: " + request);
     try {
       containerRunner.submitWork(request);
     } catch (IOException e) {
@@ -73,20 +77,13 @@ public class LlapDaemonProtocolServerImp
   public void serviceStart() {
     Configuration conf = getConfig();
 
-    int numHandlers = 
daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS,
-        LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT);
-    int port = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT,
-        LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
-    InetSocketAddress addr = new InetSocketAddress(port);
-    LOG.info("Attempting to start LlapDaemonProtocol on port=" + port + ", 
with numHandlers=" +
-        numHandlers);
-
+    InetSocketAddress addr = new InetSocketAddress(configuredPort);
     try {
       server = createServer(LlapDaemonProtocolBlockingPB.class, addr, conf, 
numHandlers,
           
LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this));
       server.start();
     } catch (IOException e) {
-      LOG.error("Failed to run RPC Server on port: " + port, e);
+      LOG.error("Failed to run RPC Server on port: " + configuredPort, e);
       throw new RuntimeException(e);
     }
 
@@ -94,7 +91,8 @@ public class LlapDaemonProtocolServerImp
     this.bindAddress.set(NetUtils.createSocketAddrForHost(
         serverBindAddress.getAddress().getCanonicalHostName(),
         serverBindAddress.getPort()));
-    LOG.info("Instantiated LlapDaemonProtocol at " + bindAddress);
+    LOG.info("Instantiated " + 
LlapDaemonProtocolBlockingPB.class.getSimpleName() + " at " +
+        bindAddress);
   }
 
   @Override
@@ -107,7 +105,7 @@ public class LlapDaemonProtocolServerImp
   @InterfaceAudience.Private
   @VisibleForTesting
   InetSocketAddress getBindAddress() {
-    return this.bindAddress.get();
+    return bindAddress.get();
   }
 
   private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, 
Configuration conf,

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java?rev=1665314&r1=1665313&r2=1665314&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
 Mon Mar  9 18:09:46 2015
@@ -110,7 +110,7 @@ public class ShuffleHandler {
 
   private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
 
-  public static final String SHUFFLE_HANDLER_LOCAL_DIRS = 
"tez.shuffle.handler.local-dirs";
+  public static final String SHUFFLE_HANDLER_LOCAL_DIRS = 
"llap.shuffle.handler.local-dirs";
 
   public static final String SHUFFLE_MANAGE_OS_CACHE = 
"mapreduce.shuffle.manage.os.cache";
   public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
@@ -147,11 +147,10 @@ public class ShuffleHandler {
   private Map<String,String> userRsrc;
   private JobTokenSecretManager secretManager;
 
-  // TODO Fix this for tez.
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
       "mapreduce_shuffle";
 
-  public static final String SHUFFLE_PORT_CONFIG_KEY = "tez.shuffle.port";
+  public static final String SHUFFLE_PORT_CONFIG_KEY = "llap.shuffle.port";
   public static final int DEFAULT_SHUFFLE_PORT = 15551;
 
   // TODO Change configs to remove mapreduce references.
@@ -287,7 +286,7 @@ public class ShuffleHandler {
     port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
     conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
     pipelineFact.SHUFFLE.setPort(port);
-    LOG.info("TezShuffleHandler" + " listening on port " + port);
+    LOG.info("LlapShuffleHandler" + " listening on port " + port);
   }
 
   public static void initializeAndStart(Configuration conf) throws Exception {
@@ -298,6 +297,12 @@ public class ShuffleHandler {
     }
   }
 
+  public static void shutdown() throws Exception {
+    if (INSTANCE != null) {
+      INSTANCE.stop();
+    }
+  }
+
   public static ShuffleHandler get() {
     Preconditions.checkState(started.get(), "ShuffleHandler must be started 
before invoking started");
     return INSTANCE;
@@ -350,6 +355,10 @@ public class ShuffleHandler {
     return jt;
   }
 
+  public int getPort() {
+    return port;
+  }
+
   public void registerApplication(String applicationIdString, 
Token<JobTokenIdentifier> appToken,
                                   String user) {
     Boolean registered = registeredApps.putIfAbsent(applicationIdString, 
Boolean.valueOf(true));

Added: 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java?rev=1665314&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
 (added)
+++ 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
 Mon Mar  9 18:09:46 2015
@@ -0,0 +1,170 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+public class MiniLlapCluster extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(MiniLlapCluster.class);
+
+  private final File testWorkDir;
+  private final long execBytesPerService;
+  private final boolean llapIoEnabled;
+  private final long ioBytesPerService;
+  private final int numExecutorsPerService;
+  private final String[] localDirs;
+  private final Configuration clusterSpecificConfiguration = new 
Configuration(false);
+
+  private LlapDaemon llapDaemon;
+
+  public static MiniLlapCluster create(String clusterName, int 
numExecutorsPerService,
+                                       long execBytePerService, boolean 
llapIoEnabled,
+                                       long ioBytesPerService, int 
numLocalDirs) {
+    return new MiniLlapCluster(clusterName, numExecutorsPerService, 
execBytePerService,
+        llapIoEnabled, ioBytesPerService, numLocalDirs);
+  }
+
+  // TODO Add support for multiple instances
+  private MiniLlapCluster(String clusterName, int numExecutorsPerService, long 
execMemoryPerService,
+                          boolean llapIoEnabled, long ioBytesPerService, int 
numLocalDirs) {
+    super(clusterName + "_" + MiniLlapCluster.class.getSimpleName());
+    Preconditions.checkArgument(numExecutorsPerService > 0);
+    Preconditions.checkArgument(execMemoryPerService > 0);
+    Preconditions.checkArgument(numLocalDirs > 0);
+    String clusterNameTrimmed = clusterName.replace("$", "") + "_" + 
MiniLlapCluster.class.getSimpleName();
+    File targetWorkDir = new File("target", clusterNameTrimmed);
+    try {
+      FileContext.getLocalFSFileContext().delete(
+          new Path(targetWorkDir.getAbsolutePath()), true);
+    } catch (Exception e) {
+      LOG.warn("Could not cleanup test workDir: " + targetWorkDir, e);
+      throw new RuntimeException("Could not cleanup test workDir: " + 
targetWorkDir, e);
+    }
+
+    if (Shell.WINDOWS) {
+      // The test working directory can exceed the maximum path length 
supported
+      // by some Windows APIs and cmd.exe (260 characters).  To work around 
this,
+      // create a symlink in temporary storage with a much shorter path,
+      // targeting the full path to the test working directory.  Then, use the
+      // symlink as the test working directory.
+      String targetPath = targetWorkDir.getAbsolutePath();
+      File link = new File(System.getProperty("java.io.tmpdir"),
+          String.valueOf(System.currentTimeMillis()));
+      String linkPath = link.getAbsolutePath();
+
+      try {
+        FileContext.getLocalFSFileContext().delete(new Path(linkPath), true);
+      } catch (IOException e) {
+        throw new YarnRuntimeException("could not cleanup symlink: " + 
linkPath, e);
+      }
+
+      // Guarantee target exists before creating symlink.
+      targetWorkDir.mkdirs();
+
+      Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
+          Shell.getSymlinkCommand(targetPath, linkPath));
+      try {
+        shexec.execute();
+      } catch (IOException e) {
+        throw new YarnRuntimeException(String.format(
+            "failed to create symlink from %s to %s, shell output: %s", 
linkPath,
+            targetPath, shexec.getOutput()), e);
+      }
+
+      this.testWorkDir = link;
+    } else {
+      this.testWorkDir = targetWorkDir;
+    }
+    this.numExecutorsPerService = numExecutorsPerService;
+    this.execBytesPerService = execMemoryPerService;
+    this.llapIoEnabled = llapIoEnabled;
+    this.ioBytesPerService = ioBytesPerService;
+
+    // Setup Local Dirs
+    localDirs = new String[numLocalDirs];
+    for (int i = 0 ; i < numLocalDirs ; i++) {
+      File f = new File(testWorkDir, "localDir");
+      f.mkdirs();
+      LOG.info("Created localDir: " + f.getAbsolutePath());
+      localDirs[i] = f.getAbsolutePath();
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    llapDaemon = new LlapDaemon(conf, numExecutorsPerService, 
execBytesPerService, llapIoEnabled,
+        ioBytesPerService, localDirs, 0, 0);
+    llapDaemon.init(conf);
+  }
+
+  @Override
+  public void serviceStart() {
+    llapDaemon.start();
+
+    
clusterSpecificConfiguration.set(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS,
+        getServiceAddress().getHostName());
+    
clusterSpecificConfiguration.setInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT,
+        getServiceAddress().getPort());
+
+    clusterSpecificConfiguration.setInt(
+        LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
+        numExecutorsPerService);
+    clusterSpecificConfiguration.setLong(
+        LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, 
execBytesPerService);
+  }
+
+  @Override
+  public void serviceStop() {
+    if (llapDaemon != null) {
+      llapDaemon.stop();
+      llapDaemon = null;
+    }
+  }
+
+  private InetSocketAddress getServiceAddress() {
+    Preconditions.checkState(getServiceState() == Service.STATE.STARTED);
+    return llapDaemon.getListenerAddress();
+  }
+
+  private int getShufflePort() {
+    Preconditions.checkState(getServiceState() == Service.STATE.STARTED);
+    return llapDaemon.getShufflePort();
+  }
+
+  public Configuration getClusterSpecificConfiguration() {
+    Preconditions.checkState(getServiceState() == Service.STATE.STARTED);
+    return clusterSpecificConfiguration;
+  }
+
+  // Mainly for verification
+  public long getNumSubmissions() {
+    return llapDaemon.getNumSubmissions();
+  }
+
+}

Modified: 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java?rev=1665314&r1=1665313&r2=1665314&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
 Mon Mar  9 18:09:46 2015
@@ -33,9 +33,13 @@ public class TestLlapDaemonProtocolServe
   @Test(timeout = 10000)
   public void test() throws ServiceException {
     LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration();
+    int rpcPort = 
daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT,
+        LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+    int numHandlers = 
daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS,
+        LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT);
     LlapDaemonProtocolServerImpl server =
-        new LlapDaemonProtocolServerImpl(daemonConf, 
mock(ContainerRunner.class),
-            new AtomicReference<InetSocketAddress>());
+        new LlapDaemonProtocolServerImpl(numHandlers, 
mock(ContainerRunner.class),
+            new AtomicReference<InetSocketAddress>(), rpcPort);
 
     try {
       server.init(new Configuration());


Reply via email to