Author: sseth Date: Mon Mar 9 18:09:46 2015 New Revision: 1665314 URL: http://svn.apache.org/r1665314 Log: HIVE-9775. LLAP: Add a MiniLLAPCluster for tests. (Siddharth Seth)
Added: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java?rev=1665314&r1=1665313&r2=1665314&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java Mon Mar 9 18:09:46 2015 @@ -17,11 +17,14 @@ package org.apache.hadoop.hive.llap.daem import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.management.ObjectName; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; @@ -33,6 +36,7 @@ import org.apache.hadoop.hive.llap.shuff import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.JvmPauseMonitor; +import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Logger; import com.google.common.base.Joiner; @@ -42,59 +46,72 @@ public class LlapDaemon extends Abstract private static final Logger LOG = Logger.getLogger(LlapDaemon.class); - private final LlapDaemonConfiguration daemonConf; - private final int numExecutors; - private final int rpcPort; + private final Configuration shuffleHandlerConf; private final LlapDaemonProtocolServerImpl server; private final ContainerRunnerImpl containerRunner; - private final String[] localDirs; - private final int shufflePort; - private final long memoryPerInstance; - private final long maxJvmMemory; + private final AtomicLong numSubmissions = new AtomicLong(0); private JvmPauseMonitor pauseMonitor; private final ObjectName llapDaemonInfoBean; private final LlapDaemonExecutorMetrics metrics; + // Parameters used for JMX + private final boolean llapIoEnabled; + private final long executorMemoryPerInstance; + private final long ioMemoryPerInstance; + private final int numExecutors; + private final long maxJvmMemory; + private final String[] localDirs; + // TODO Not the best way to share the address private final AtomicReference<InetSocketAddress> address = new AtomicReference<InetSocketAddress>(); - public LlapDaemon(LlapDaemonConfiguration daemonConf) { + public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes, + boolean ioEnabled, long ioMemoryBytes, String[] localDirs, int rpcPort, + int shufflePort) { super("LlapDaemon"); printAsciiArt(); - // TODO This needs to read TezConfiguration to pick up things like the heartbeat interval from config. - // Ideally, this would be part of llap-daemon-configuration - this.numExecutors = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS, - LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); - this.rpcPort = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT, - LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - this.daemonConf = daemonConf; - this.localDirs = daemonConf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_WORK_DIRS); - this.shufflePort = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, -1); - - memoryPerInstance = this.daemonConf - .getInt(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, - LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 1024l; - maxJvmMemory = Runtime.getRuntime().maxMemory(); + Preconditions.checkArgument(numExecutors > 0); + Preconditions.checkArgument(rpcPort == 0 || (rpcPort > 1024 && rpcPort < 65536), + "RPC Port must be between 1025 and 65535, or 0 automatic selection"); + Preconditions.checkArgument(localDirs != null && localDirs.length > 0, + "Work dirs must be specified"); + Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && shufflePort < 65536), + "Shuffle Port must be betwee 1024 and 65535, or 0 for automatic selection"); + + this.maxJvmMemory = Runtime.getRuntime().maxMemory(); + this.llapIoEnabled = ioEnabled; + this.executorMemoryPerInstance = executorMemoryBytes; + this.ioMemoryPerInstance = ioMemoryBytes; + this.numExecutors = numExecutors; + this.localDirs = localDirs; - LOG.info("LlapDaemon started with the following configuration: " + + LOG.info("Attempting to start LlapDaemonConf with the following configuration: " + "numExecutors=" + numExecutors + ", rpcListenerPort=" + rpcPort + ", workDirs=" + Arrays.toString(localDirs) + ", shufflePort=" + shufflePort + - ", memoryConfigured=" + memoryPerInstance + + ", executorMemory=" + executorMemoryBytes + + ", llapIoEnabled=" + ioEnabled + + ", llapIoCacheSize=" + ioMemoryBytes + ", jvmAvailableMemory=" + maxJvmMemory); - Preconditions.checkArgument(this.numExecutors > 0); - Preconditions.checkArgument(this.rpcPort > 1024 && this.rpcPort < 65536, - "RPC Port must be between 1025 and 65534"); - Preconditions.checkArgument(this.localDirs != null && this.localDirs.length > 0, - "Work dirs must be specified"); - Preconditions.checkArgument(this.shufflePort > 0, "ShufflePort must be specified"); - Preconditions.checkState(maxJvmMemory >= memoryPerInstance, - "Invalid configuration. Xmx value too small. maxAvailable=" + maxJvmMemory + ", configured=" + - memoryPerInstance); + long memRequired = executorMemoryBytes + (ioEnabled ? ioMemoryBytes : 0); + Preconditions.checkState(maxJvmMemory >= memRequired, + "Invalid configuration. Xmx value too small. maxAvailable=" + maxJvmMemory + + ", configured(exec + io if enabled)=" + + memRequired); + + this.shuffleHandlerConf = new Configuration(daemonConf); + this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, shufflePort); + this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, + StringUtils.arrayToString(localDirs)); + + // Less frequently set parameter, not passing in as a param. + int numHandlers = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS, + LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT); + this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, rpcPort); // Initialize the metric system LlapMetricsSystem.initialize("LlapDaemon"); @@ -105,12 +122,11 @@ public class LlapDaemon extends Abstract this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", this); - LOG.info("Started LlapMetricsSystem with displayName: " + displayName - + " sessionId: " + sessionId); + LOG.info("Started LlapMetricsSystem with displayName: " + displayName + + " sessionId: " + sessionId); - this.server = new LlapDaemonProtocolServerImpl(daemonConf, this, address); this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, shufflePort, address, - memoryPerInstance, metrics); + executorMemoryBytes, metrics); } private void printAsciiArt() { @@ -136,15 +152,18 @@ public class LlapDaemon extends Abstract } @Override - public void serviceStart() { + public void serviceStart() throws Exception { + ShuffleHandler.initializeAndStart(shuffleHandlerConf); server.start(); containerRunner.start(); } - public void serviceStop() { + public void serviceStop() throws Exception { + // TODO Shutdown LlapIO shutdown(); containerRunner.stop(); server.stop(); + ShuffleHandler.shutdown(); } public void shutdown() { @@ -167,14 +186,28 @@ public class LlapDaemon extends Abstract public static void main(String[] args) throws Exception { LlapDaemon llapDaemon = null; try { + // Cache settings will need to be setup in llap-daemon-site.xml - since the daemons don't read hive-site.xml + // Ideally, these properties should be part of LlapDameonConf rather than HiveConf LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration(); + int numExecutors = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + String[] localDirs = + daemonConf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_WORK_DIRS); + int rpcPort = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT, + LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + int shufflePort = daemonConf + .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); + long executorMemoryBytes = daemonConf + .getInt(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, + LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 1024l; + long cacheMemoryBytes = + HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE); + boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED); + llapDaemon = + new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, + cacheMemoryBytes, localDirs, + rpcPort, shufflePort); - Configuration shuffleHandlerConf = new Configuration(daemonConf); - shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, - daemonConf.get(LlapDaemonConfiguration.LLAP_DAEMON_WORK_DIRS)); - ShuffleHandler.initializeAndStart(shuffleHandlerConf); - - llapDaemon = new LlapDaemon(daemonConf); llapDaemon.init(daemonConf); llapDaemon.start(); LOG.info("Started LlapDaemon"); @@ -192,13 +225,23 @@ public class LlapDaemon extends Abstract @Override public void submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws IOException { + numSubmissions.incrementAndGet(); containerRunner.submitWork(request); } + @VisibleForTesting + public long getNumSubmissions() { + return numSubmissions.get(); + } + + public InetSocketAddress getListenerAddress() { + return server.getBindAddress(); + } + // LlapDaemonMXBean methods. Will be exposed via JMX @Override public int getRpcPort() { - return rpcPort; + return server.getBindAddress().getPort(); } @Override @@ -208,7 +251,7 @@ public class LlapDaemon extends Abstract @Override public int getShufflePort() { - return shufflePort; + return ShuffleHandler.get().getPort(); } @Override @@ -217,8 +260,18 @@ public class LlapDaemon extends Abstract } @Override - public long getMemoryPerInstance() { - return memoryPerInstance; + public long getExecutorMemoryPerInstance() { + return executorMemoryPerInstance; + } + + @Override + public long getIoMemoryPerInstance() { + return ioMemoryPerInstance; + } + + @Override + public boolean isIoEnabled() { + return llapIoEnabled; } @Override Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java?rev=1665314&r1=1665313&r2=1665314&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java Mon Mar 9 18:09:46 2015 @@ -50,10 +50,22 @@ public interface LlapDaemonMXBean { public String getLocalDirs(); /** - * Gets llap daemon configured memory per instance. + * Gets llap daemon configured executor memory per instance. * @return memory per instance */ - public long getMemoryPerInstance(); + public long getExecutorMemoryPerInstance(); + + /** + * Gets llap daemon configured io memory per instance. + * @return memory per instance + */ + public long getIoMemoryPerInstance(); + + /** + * Checks if Llap IO is enabled + * @return true if enabled, false if not + */ + public boolean isIoEnabled(); /** * Gets max available jvm memory. Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java?rev=1665314&r1=1665313&r2=1665314&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java Mon Mar 9 18:09:46 2015 @@ -33,7 +33,6 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; public class LlapDaemonProtocolServerImpl extends AbstractService @@ -41,26 +40,31 @@ public class LlapDaemonProtocolServerImp private static final Log LOG = LogFactory.getLog(LlapDaemonProtocolServerImpl.class); - private final LlapDaemonConfiguration daemonConf; + private final int numHandlers; private final ContainerRunner containerRunner; + private final int configuredPort; private RPC.Server server; private final AtomicReference<InetSocketAddress> bindAddress; - public LlapDaemonProtocolServerImpl(LlapDaemonConfiguration daemonConf, + public LlapDaemonProtocolServerImpl(int numHandlers, ContainerRunner containerRunner, - AtomicReference<InetSocketAddress> address) { + AtomicReference<InetSocketAddress> address, + int configuredPort) { super("LlapDaemonProtocolServerImpl"); - this.daemonConf = daemonConf; + this.numHandlers = numHandlers; this.containerRunner = containerRunner; this.bindAddress = address; + this.configuredPort = configuredPort; + LOG.info("Creating: " + LlapDaemonProtocolServerImpl.class.getSimpleName() + + " with port configured to: " + configuredPort); } @Override public SubmitWorkResponseProto submitWork(RpcController controller, LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws ServiceException { - LOG.info("DEBUG: Recevied request: " + request); + LOG.info("DEBUG: Received request: " + request); try { containerRunner.submitWork(request); } catch (IOException e) { @@ -73,20 +77,13 @@ public class LlapDaemonProtocolServerImp public void serviceStart() { Configuration conf = getConfig(); - int numHandlers = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS, - LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT); - int port = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT, - LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - InetSocketAddress addr = new InetSocketAddress(port); - LOG.info("Attempting to start LlapDaemonProtocol on port=" + port + ", with numHandlers=" + - numHandlers); - + InetSocketAddress addr = new InetSocketAddress(configuredPort); try { server = createServer(LlapDaemonProtocolBlockingPB.class, addr, conf, numHandlers, LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this)); server.start(); } catch (IOException e) { - LOG.error("Failed to run RPC Server on port: " + port, e); + LOG.error("Failed to run RPC Server on port: " + configuredPort, e); throw new RuntimeException(e); } @@ -94,7 +91,8 @@ public class LlapDaemonProtocolServerImp this.bindAddress.set(NetUtils.createSocketAddrForHost( serverBindAddress.getAddress().getCanonicalHostName(), serverBindAddress.getPort())); - LOG.info("Instantiated LlapDaemonProtocol at " + bindAddress); + LOG.info("Instantiated " + LlapDaemonProtocolBlockingPB.class.getSimpleName() + " at " + + bindAddress); } @Override @@ -107,7 +105,7 @@ public class LlapDaemonProtocolServerImp @InterfaceAudience.Private @VisibleForTesting InetSocketAddress getBindAddress() { - return this.bindAddress.get(); + return bindAddress.get(); } private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java?rev=1665314&r1=1665313&r2=1665314&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java Mon Mar 9 18:09:46 2015 @@ -110,7 +110,7 @@ public class ShuffleHandler { private static final Log LOG = LogFactory.getLog(ShuffleHandler.class); - public static final String SHUFFLE_HANDLER_LOCAL_DIRS = "tez.shuffle.handler.local-dirs"; + public static final String SHUFFLE_HANDLER_LOCAL_DIRS = "llap.shuffle.handler.local-dirs"; public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache"; public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true; @@ -147,11 +147,10 @@ public class ShuffleHandler { private Map<String,String> userRsrc; private JobTokenSecretManager secretManager; - // TODO Fix this for tez. public static final String MAPREDUCE_SHUFFLE_SERVICEID = "mapreduce_shuffle"; - public static final String SHUFFLE_PORT_CONFIG_KEY = "tez.shuffle.port"; + public static final String SHUFFLE_PORT_CONFIG_KEY = "llap.shuffle.port"; public static final int DEFAULT_SHUFFLE_PORT = 15551; // TODO Change configs to remove mapreduce references. @@ -287,7 +286,7 @@ public class ShuffleHandler { port = ((InetSocketAddress)ch.getLocalAddress()).getPort(); conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port)); pipelineFact.SHUFFLE.setPort(port); - LOG.info("TezShuffleHandler" + " listening on port " + port); + LOG.info("LlapShuffleHandler" + " listening on port " + port); } public static void initializeAndStart(Configuration conf) throws Exception { @@ -298,6 +297,12 @@ public class ShuffleHandler { } } + public static void shutdown() throws Exception { + if (INSTANCE != null) { + INSTANCE.stop(); + } + } + public static ShuffleHandler get() { Preconditions.checkState(started.get(), "ShuffleHandler must be started before invoking started"); return INSTANCE; @@ -350,6 +355,10 @@ public class ShuffleHandler { return jt; } + public int getPort() { + return port; + } + public void registerApplication(String applicationIdString, Token<JobTokenIdentifier> appToken, String user) { Boolean registered = registeredApps.putIfAbsent(applicationIdString, Boolean.valueOf(true)); Added: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java?rev=1665314&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java (added) +++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java Mon Mar 9 18:09:46 2015 @@ -0,0 +1,170 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +public class MiniLlapCluster extends AbstractService { + private static final Log LOG = LogFactory.getLog(MiniLlapCluster.class); + + private final File testWorkDir; + private final long execBytesPerService; + private final boolean llapIoEnabled; + private final long ioBytesPerService; + private final int numExecutorsPerService; + private final String[] localDirs; + private final Configuration clusterSpecificConfiguration = new Configuration(false); + + private LlapDaemon llapDaemon; + + public static MiniLlapCluster create(String clusterName, int numExecutorsPerService, + long execBytePerService, boolean llapIoEnabled, + long ioBytesPerService, int numLocalDirs) { + return new MiniLlapCluster(clusterName, numExecutorsPerService, execBytePerService, + llapIoEnabled, ioBytesPerService, numLocalDirs); + } + + // TODO Add support for multiple instances + private MiniLlapCluster(String clusterName, int numExecutorsPerService, long execMemoryPerService, + boolean llapIoEnabled, long ioBytesPerService, int numLocalDirs) { + super(clusterName + "_" + MiniLlapCluster.class.getSimpleName()); + Preconditions.checkArgument(numExecutorsPerService > 0); + Preconditions.checkArgument(execMemoryPerService > 0); + Preconditions.checkArgument(numLocalDirs > 0); + String clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName(); + File targetWorkDir = new File("target", clusterNameTrimmed); + try { + FileContext.getLocalFSFileContext().delete( + new Path(targetWorkDir.getAbsolutePath()), true); + } catch (Exception e) { + LOG.warn("Could not cleanup test workDir: " + targetWorkDir, e); + throw new RuntimeException("Could not cleanup test workDir: " + targetWorkDir, e); + } + + if (Shell.WINDOWS) { + // The test working directory can exceed the maximum path length supported + // by some Windows APIs and cmd.exe (260 characters). To work around this, + // create a symlink in temporary storage with a much shorter path, + // targeting the full path to the test working directory. Then, use the + // symlink as the test working directory. + String targetPath = targetWorkDir.getAbsolutePath(); + File link = new File(System.getProperty("java.io.tmpdir"), + String.valueOf(System.currentTimeMillis())); + String linkPath = link.getAbsolutePath(); + + try { + FileContext.getLocalFSFileContext().delete(new Path(linkPath), true); + } catch (IOException e) { + throw new YarnRuntimeException("could not cleanup symlink: " + linkPath, e); + } + + // Guarantee target exists before creating symlink. + targetWorkDir.mkdirs(); + + Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor( + Shell.getSymlinkCommand(targetPath, linkPath)); + try { + shexec.execute(); + } catch (IOException e) { + throw new YarnRuntimeException(String.format( + "failed to create symlink from %s to %s, shell output: %s", linkPath, + targetPath, shexec.getOutput()), e); + } + + this.testWorkDir = link; + } else { + this.testWorkDir = targetWorkDir; + } + this.numExecutorsPerService = numExecutorsPerService; + this.execBytesPerService = execMemoryPerService; + this.llapIoEnabled = llapIoEnabled; + this.ioBytesPerService = ioBytesPerService; + + // Setup Local Dirs + localDirs = new String[numLocalDirs]; + for (int i = 0 ; i < numLocalDirs ; i++) { + File f = new File(testWorkDir, "localDir"); + f.mkdirs(); + LOG.info("Created localDir: " + f.getAbsolutePath()); + localDirs[i] = f.getAbsolutePath(); + } + } + + @Override + public void serviceInit(Configuration conf) { + llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, + ioBytesPerService, localDirs, 0, 0); + llapDaemon.init(conf); + } + + @Override + public void serviceStart() { + llapDaemon.start(); + + clusterSpecificConfiguration.set(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS, + getServiceAddress().getHostName()); + clusterSpecificConfiguration.setInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT, + getServiceAddress().getPort()); + + clusterSpecificConfiguration.setInt( + LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + numExecutorsPerService); + clusterSpecificConfiguration.setLong( + LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, execBytesPerService); + } + + @Override + public void serviceStop() { + if (llapDaemon != null) { + llapDaemon.stop(); + llapDaemon = null; + } + } + + private InetSocketAddress getServiceAddress() { + Preconditions.checkState(getServiceState() == Service.STATE.STARTED); + return llapDaemon.getListenerAddress(); + } + + private int getShufflePort() { + Preconditions.checkState(getServiceState() == Service.STATE.STARTED); + return llapDaemon.getShufflePort(); + } + + public Configuration getClusterSpecificConfiguration() { + Preconditions.checkState(getServiceState() == Service.STATE.STARTED); + return clusterSpecificConfiguration; + } + + // Mainly for verification + public long getNumSubmissions() { + return llapDaemon.getNumSubmissions(); + } + +} Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java?rev=1665314&r1=1665313&r2=1665314&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java (original) +++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java Mon Mar 9 18:09:46 2015 @@ -33,9 +33,13 @@ public class TestLlapDaemonProtocolServe @Test(timeout = 10000) public void test() throws ServiceException { LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration(); + int rpcPort = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT, + LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + int numHandlers = daemonConf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS, + LlapDaemonConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT); LlapDaemonProtocolServerImpl server = - new LlapDaemonProtocolServerImpl(daemonConf, mock(ContainerRunner.class), - new AtomicReference<InetSocketAddress>()); + new LlapDaemonProtocolServerImpl(numHandlers, mock(ContainerRunner.class), + new AtomicReference<InetSocketAddress>(), rpcPort); try { server.init(new Configuration());