Repository: hive
Updated Branches:
  refs/heads/llap 8256e59a9 -> 785ec8e75


HIVE-10960: LLAP: Allow finer control of startup options for LLAP (gopalv)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/785ec8e7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/785ec8e7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/785ec8e7

Branch: refs/heads/llap
Commit: 785ec8e75878170427a2a2c50f035d1d7cd37650
Parents: 8256e59
Author: Gopal V <gop...@apache.org>
Authored: Mon Jun 8 11:39:08 2015 -0700
Committer: Gopal V <gop...@apache.org>
Committed: Mon Jun 8 11:39:08 2015 -0700

----------------------------------------------------------------------
 .../llap/configuration/LlapConfiguration.java   |  3 ++
 llap-server/bin/runLlapDaemon.sh                |  2 +-
 .../hive/llap/cli/LlapOptionsProcessor.java     | 53 ++++++++++++++++++--
 .../hadoop/hive/llap/cli/LlapServiceDriver.java | 52 ++++++++++++++++++-
 .../hive/llap/daemon/impl/LlapDaemon.java       | 40 +++++++++++++--
 llap-server/src/main/resources/package.py       | 29 +++++------
 .../hive/llap/daemon/MiniLlapCluster.java       | 15 +++---
 7 files changed, 164 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/785ec8e7/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
index b57ae80..dd24661 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
@@ -44,6 +44,9 @@ public class LlapConfiguration extends Configuration {
   public static final String LLAP_DAEMON_YARN_SHUFFLE_PORT = 
LLAP_DAEMON_PREFIX + "yarn.shuffle.port";
   public static final int LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT = 15551;
 
+  public static final String LLAP_DAEMON_YARN_CONTAINER_MB = 
LLAP_DAEMON_PREFIX + "yarn.container.mb";
+  public static final int LLAP_DAEMON_YARN_CONTAINER_MB_DEFAULT = -1;
+
   public static final String LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED = 
LLAP_DAEMON_PREFIX + "shuffle.dir-watcher.enabled";
   public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT 
= false;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/785ec8e7/llap-server/bin/runLlapDaemon.sh
----------------------------------------------------------------------
diff --git a/llap-server/bin/runLlapDaemon.sh b/llap-server/bin/runLlapDaemon.sh
index dd59439..bd14fe8 100755
--- a/llap-server/bin/runLlapDaemon.sh
+++ b/llap-server/bin/runLlapDaemon.sh
@@ -48,7 +48,7 @@ shift
 
 JAVA=$JAVA_HOME/bin/java
 LOG_LEVEL_DEFAULT="INFO,console"
-JAVA_OPTS_BASE="-server -Djava.net.preferIPv4Stack=true -XX:+UseNUMA 
-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps"
+JAVA_OPTS_BASE="-server -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 
-XX:+UseNUMA -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps"
 
 # CLASSPATH initially contains $HADOOP_CONF_DIR & $YARN_CONF_DIR
 if [ ! -d "$HADOOP_CONF_DIR" ]; then

http://git-wip-us.apache.org/repos/asf/hive/blob/785ec8e7/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
index 67a099a..211cd8b 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
@@ -29,6 +29,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
 
 public class LlapOptionsProcessor {
 
@@ -36,9 +37,13 @@ public class LlapOptionsProcessor {
     private int instances = 0;
     private String directory = null;
     private String name;
+    private int executors;
+    private long cache;
+    private long size;
+    private long xmx;
 
-    public LlapOptions(String name, int instances, String directory)
-        throws ParseException {
+    public LlapOptions(String name, int instances, String directory, int 
executors, long cache,
+        long size, long xmx) throws ParseException {
       if (instances <= 0) {
         throw new ParseException("Invalid configuration: " + instances
             + " (should be greater than 0)");
@@ -46,6 +51,10 @@ public class LlapOptionsProcessor {
       this.instances = instances;
       this.directory = directory;
       this.name = name;
+      this.executors = executors;
+      this.cache = cache;
+      this.size = size;
+      this.xmx = xmx;
     }
 
     public String getName() {
@@ -59,6 +68,22 @@ public class LlapOptionsProcessor {
     public String getDirectory() {
       return directory;
     }
+
+    public int getExecutors() {
+      return executors;
+    }
+
+    public long getCache() {
+      return cache;
+    }
+
+    public long getSize() {
+      return size;
+    }
+
+    public long getXmx() {
+      return xmx;
+    }
   }
 
   protected static final Log l4j = 
LogFactory.getLog(LlapOptionsProcessor.class.getName());
@@ -88,10 +113,26 @@ public class LlapOptionsProcessor {
     
options.addOption(OptionBuilder.hasArg().withArgName("chaosmonkey").withLongOpt("chaosmonkey")
         .withDescription("chaosmonkey interval").create('m'));
 
+    
options.addOption(OptionBuilder.hasArg().withArgName("executors").withLongOpt("executors")
+        .withDescription("executor per instance").create('e'));
+
+    
options.addOption(OptionBuilder.hasArg().withArgName("cache").withLongOpt("cache")
+        .withDescription("cache size per instance").create('c'));
+
+    
options.addOption(OptionBuilder.hasArg().withArgName("size").withLongOpt("size")
+        .withDescription("container size per instance").create('s'));
+
+    
options.addOption(OptionBuilder.hasArg().withArgName("xmx").withLongOpt("xmx")
+        .withDescription("working memory size").create('w'));
+
     // [-H|--help]
     options.addOption(new Option("H", "help", false, "Print help 
information"));
   }
 
+  private static long parseSuffixed(String value) {
+    return StringUtils.TraditionalBinaryPrefix.string2long(value);
+  }
+
   public LlapOptions processOptions(String argv[]) throws ParseException {
     commandLine = new GnuParser().parse(options, argv);
     if (commandLine.hasOption('H') || false == 
commandLine.hasOption("instances")) {
@@ -104,9 +145,15 @@ public class LlapOptionsProcessor {
     String directory = commandLine.getOptionValue("directory");
 
     String name = commandLine.getOptionValue("name", null);
+
+    int executors = Integer.parseInt(commandLine.getOptionValue("executors", 
"-1"));
+    long cache = parseSuffixed(commandLine.getOptionValue("cache", "-1"));
+    long size = parseSuffixed(commandLine.getOptionValue("size", "-1"));
+    long xmx = parseSuffixed(commandLine.getOptionValue("xmx", "-1"));
+
     // loglevel, chaosmonkey & args are parsed by the python processor
 
-    return new LlapOptions(name, instances, directory);
+    return new LlapOptions(name, instances, directory, executors, cache, size, 
xmx);
   }
 
   private void printUsage() {

http://git-wip-us.apache.org/repos/asf/hive/blob/785ec8e7/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 2ee3dd8..b3d155b 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.json.JSONArray;
 import org.json.JSONObject;
 
+import com.google.common.base.Preconditions;
+
 public class LlapServiceDriver {
 
   protected static final Log LOG = 
LogFactory.getLog(LlapServiceDriver.class.getName());
@@ -119,11 +121,53 @@ public class LlapServiceDriver {
     conf.reloadConfiguration();
 
     if (options.getName() != null) {
-      // update service registry configs - caveat: this has nothing to do with 
the actual settings as read by the AM
-      // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to 
dynamically switch between instances
+      // update service registry configs - caveat: this has nothing to do with 
the actual settings
+      // as read by the AM
+      // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to 
dynamically switch between
+      // instances
       conf.set(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, "@" + 
options.getName());
     }
 
+    if (options.getSize() != -1) {
+      if (options.getCache() != -1) {
+        Preconditions.checkArgument(options.getCache() < options.getSize(),
+            "Cache has to be smaller than the container sizing");
+      }
+      if (options.getXmx() != -1) {
+        Preconditions.checkArgument(options.getXmx() < options.getSize(),
+            "Working memory has to be smaller than the container sizing");
+      }
+      if (HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT)) {
+        Preconditions.checkArgument(options.getXmx() + options.getCache() < 
options.getSize(),
+            "Working memory + cache has to be smaller than the containing 
sizing ");
+      }
+    }
+
+    final long minAlloc = 
conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
+    if (options.getSize() != -1) {
+      final long containerSize = options.getSize() / (1024 * 1024);
+      Preconditions.checkArgument(containerSize >= minAlloc,
+          "Container size should be greater than minimum allocation(%s)", 
minAlloc + "m");
+      conf.setLong(LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, 
containerSize);
+    }
+
+    if (options.getExecutors() != -1) {
+      conf.setLong(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, 
options.getExecutors());
+      // TODO: vcpu settings - possibly when DRFA works right
+    }
+
+    if (options.getCache() != -1) {
+      conf.setLong(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, 
options.getCache());
+    }
+
+    if (options.getXmx() != -1) {
+      // Needs more explanation here
+      // Xmx is not the max heap value in JDK8
+      // You need to subtract 50% of the survivor fraction from this, to get 
actual usable memory before it goes into GC 
+      conf.setLong(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, 
(long)(options.getXmx())
+          / (1024 * 1024));
+    }
+
     URL logger = conf.getResource("llap-daemon-log4j.properties");
 
     if (null == logger) {
@@ -182,6 +226,10 @@ public class LlapServiceDriver {
     // extract configs for processing by the python fragments in Slider
     JSONObject configs = new JSONObject();
 
+    configs.put(LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, conf.getInt(
+        LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB,
+        LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB_DEFAULT));
+
     configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname,
         HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
 

http://git-wip-us.apache.org/repos/asf/hive/blob/785ec8e7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 7b53e63..75377d4 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -15,6 +15,10 @@
 package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Set;
@@ -48,6 +52,7 @@ import org.apache.hive.common.util.ShutdownHookManager;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +82,7 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
   private final AtomicReference<InetSocketAddress> address = new 
AtomicReference<InetSocketAddress>();
 
   public LlapDaemon(Configuration daemonConf, int numExecutors, long 
executorMemoryBytes,
-      boolean ioEnabled, long ioMemoryBytes, String[] localDirs, int rpcPort,
+      boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] 
localDirs, int rpcPort,
       int shufflePort) {
     super("LlapDaemon");
 
@@ -91,7 +96,7 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
     Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && 
shufflePort < 65536),
         "Shuffle Port must be betwee 1024 and 65535, or 0 for automatic 
selection");
 
-    this.maxJvmMemory = Runtime.getRuntime().maxMemory();
+    this.maxJvmMemory = getTotalHeapSize();
     this.llapIoEnabled = ioEnabled;
     this.executorMemoryPerInstance = executorMemoryBytes;
     this.ioMemoryPerInstance = ioMemoryBytes;
@@ -111,12 +116,15 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
         ", shufflePort=" + shufflePort +
         ", executorMemory=" + executorMemoryBytes +
         ", llapIoEnabled=" + ioEnabled +
+        ", llapIoCacheIsDirect=" + isDirectCache +
         ", llapIoCacheSize=" + ioMemoryBytes +
         ", jvmAvailableMemory=" + maxJvmMemory +
         ", waitQueueSize= " + waitQueueSize +
         ", enablePreemption= " + enablePreemption);
 
-    long memRequired = executorMemoryBytes + (ioEnabled ? ioMemoryBytes : 0);
+    long memRequired =
+        executorMemoryBytes + (ioEnabled && isDirectCache == false ? 
ioMemoryBytes : 0);
+    // TODO: this check is somewhat bogus as the maxJvmMemory != Xmx 
parameters (see annotation in LlapServiceDriver)
     Preconditions.checkState(maxJvmMemory >= memRequired,
         "Invalid configuration. Xmx value too small. maxAvailable=" + 
maxJvmMemory +
             ", configured(exec + io if enabled)=" +
@@ -167,6 +175,28 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
     addIfService(webServices);
   }
 
+  private long getTotalHeapSize() {
+    // runtime.getMax() gives a very different number from the actual Xmx 
sizing.
+    // you can iterate through the
+    // 
http://docs.oracle.com/javase/7/docs/api/java/lang/management/MemoryPoolMXBean.html
+    // from java.lang.management to figure this out, but the hard-coded params 
in the llap run.sh
+    // result in 89% usable heap (-XX:NewRatio=8) + a survivor region which is 
technically not
+    // in the usable space.
+
+    long total = 0;
+    MemoryMXBean m = ManagementFactory.getMemoryMXBean();
+    for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
+      long sz = mp.getUsage().getMax();
+      if (mp.getName().contains("Survivor")) {
+        sz *= 2; // there are 2 survivor spaces
+      }
+      if (mp.getType().equals(MemoryType.HEAP)) {
+        total += sz;
+      }
+    }
+    return total;
+  }
+
   private void printAsciiArt() {
     final String asciiArt = "" +
         "$$\\       $$\\        $$$$$$\\  $$$$$$$\\\n" +
@@ -239,9 +269,11 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
               LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 
1024l * 1024l;
       long cacheMemoryBytes =
           HiveConf.getLongVar(daemonConf, 
HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+      boolean isDirectCache =
+          HiveConf.getBoolVar(daemonConf, 
HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
       boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, 
HiveConf.ConfVars.LLAP_IO_ENABLED);
       llapDaemon =
-          new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, 
llapIoEnabled,
+          new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, 
llapIoEnabled, isDirectCache,
               cacheMemoryBytes, localDirs,
               rpcPort, shufflePort);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/785ec8e7/llap-server/src/main/resources/package.py
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/package.py 
b/llap-server/src/main/resources/package.py
index 39242e2..ae501cf 100644
--- a/llap-server/src/main/resources/package.py
+++ b/llap-server/src/main/resources/package.py
@@ -15,26 +15,27 @@ class LlapResource(object):
        def __init__(self, config):
                self.memory = config["llap.daemon.memory.per.instance.mb"]
                self.cores = config["llap.daemon.vcpus.per.instance"]
+               size = config["llap.daemon.yarn.container.mb"]
                # convert to Mb
                self.cache = config["hive.llap.io.cache.orc.size"] / 
(1024*1024.0)
                self.direct = config["hive.llap.io.cache.direct"]
                self.min_mb = -1
                self.min_cores = -1
-               # compute heap
-               h = max(1.2*self.memory, self.memory + 256) 
+               # compute heap + cache as final Xmx
+               h = self.memory 
                if (not self.direct):
                        h += self.cache
-               c = max(h*1.2, h + 128)
-               if (self.direct):
-                       c += self.cache
-               if self.min_mb > 0:
-                       c = c + c%self.min_mb
-                       h = c/1.2
-                       if self.direct:
-                               h = h - self.cache
-               self.container_size = int(c)
+               if size == -1:
+                       c = min(h*1.2, h + 1024) # + 1024 or 20%
+                       c += (self.direct and self.cache) or 0
+                       if self.min_mb > 0:
+                               c = c + c%self.min_mb
+               else:
+                       # do not mess with user input
+                       c = size
+               self.container_size = size
                self.container_cores = self.cores
-               self.heap_size = int(h)
+               self.heap_size = h
 
        def __repr__(self):
                return "<LlapResource heap=%d container=%d>" % (self.heap_size, 
self.container_size)
@@ -47,7 +48,7 @@ def zipdir(path, zip, prefix="."):
                        zip.write(src, dst)
        
 def main(args):
-       opts, args = getopt(args,"",["instances=","output=", 
"input=","args=","name=","loglevel=","chaosmonkey="])
+       opts, args = getopt(args,"",["instances=","output=", 
"input=","args=","name=","loglevel=","chaosmonkey=","size=","xmx=", "cache=", 
"executors="])
        version = os.getenv("HIVE_VERSION")
        if not version:
                version = strftime("%d%b%Y", gmtime()) 
@@ -144,7 +145,7 @@ def main(args):
 
        with open(join(output, "run.sh"), "w") as f:
                f.write(runner % vars)
-       os.chmod(join(output, "run.sh"), 0755)
+       os.chmod(join(output, "run.sh"), 0700)
 
        print "Prepared %s/run.sh for running LLAP on Slider" % (output)
 

http://git-wip-us.apache.org/repos/asf/hive/blob/785ec8e7/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index ac8dcba..543e616 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -38,6 +38,7 @@ public class MiniLlapCluster extends AbstractService {
   private final File testWorkDir;
   private final long execBytesPerService;
   private final boolean llapIoEnabled;
+  private final boolean ioIsDirect;
   private final long ioBytesPerService;
   private final int numExecutorsPerService;
   private final String[] localDirs;
@@ -46,15 +47,15 @@ public class MiniLlapCluster extends AbstractService {
   private LlapDaemon llapDaemon;
 
   public static MiniLlapCluster create(String clusterName, int 
numExecutorsPerService,
-                                       long execBytePerService, boolean 
llapIoEnabled,
-                                       long ioBytesPerService, int 
numLocalDirs) {
+      long execBytePerService, boolean llapIoEnabled, boolean ioIsDirect, long 
ioBytesPerService,
+      int numLocalDirs) {
     return new MiniLlapCluster(clusterName, numExecutorsPerService, 
execBytePerService,
-        llapIoEnabled, ioBytesPerService, numLocalDirs);
+        llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs);
   }
 
   // TODO Add support for multiple instances
   private MiniLlapCluster(String clusterName, int numExecutorsPerService, long 
execMemoryPerService,
-                          boolean llapIoEnabled, long ioBytesPerService, int 
numLocalDirs) {
+                          boolean llapIoEnabled, boolean ioIsDirect, long 
ioBytesPerService, int numLocalDirs) {
     super(clusterName + "_" + MiniLlapCluster.class.getSimpleName());
     Preconditions.checkArgument(numExecutorsPerService > 0);
     Preconditions.checkArgument(execMemoryPerService > 0);
@@ -105,6 +106,7 @@ public class MiniLlapCluster extends AbstractService {
     }
     this.numExecutorsPerService = numExecutorsPerService;
     this.execBytesPerService = execMemoryPerService;
+    this.ioIsDirect = ioIsDirect;
     this.llapIoEnabled = llapIoEnabled;
     this.ioBytesPerService = ioBytesPerService;
 
@@ -120,8 +122,9 @@ public class MiniLlapCluster extends AbstractService {
 
   @Override
   public void serviceInit(Configuration conf) {
-    llapDaemon = new LlapDaemon(conf, numExecutorsPerService, 
execBytesPerService, llapIoEnabled,
-        ioBytesPerService, localDirs, 0, 0);
+    llapDaemon =
+        new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, 
llapIoEnabled,
+            ioIsDirect, ioBytesPerService, localDirs, 0, 0);
     llapDaemon.init(conf);
   }
 

Reply via email to