Repository: hive Updated Branches: refs/heads/branch-2.0 2a6ebb885 -> f40c1502d refs/heads/master a75085688 -> 6cc5761b0
HIVE-11358 : LLAP: move LlapConfiguration into HiveConf and document the settings (Sergey Shelukhin, reviewed by Lefty Leverenz, Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6cc5761b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6cc5761b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6cc5761b Branch: refs/heads/master Commit: 6cc5761b0f4d9bb518e1fa7d1f5960e0b07feca5 Parents: a750856 Author: Sergey Shelukhin <ser...@apache.org> Authored: Sat Dec 5 12:59:40 2015 -0800 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Sat Dec 5 12:59:40 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 163 ++++++++++++++++++- data/conf/llap/llap-daemon-site.xml | 9 +- .../llap/configuration/LlapConfiguration.java | 139 +--------------- .../registry/impl/LlapFixedRegistryImpl.java | 18 +- .../llap/registry/impl/LlapRegistryService.java | 4 +- .../registry/impl/LlapYarnRegistryImpl.java | 27 ++- .../hadoop/hive/llap/cli/LlapServiceDriver.java | 30 ++-- .../hive/llap/daemon/impl/AMReporter.java | 20 +-- .../llap/daemon/impl/ContainerRunnerImpl.java | 6 +- .../hive/llap/daemon/impl/LlapDaemon.java | 33 ++-- .../hive/llap/daemon/impl/QueryFileCleaner.java | 6 +- .../hive/llap/daemon/impl/QueryTracker.java | 7 +- .../daemon/services/impl/LlapWebServices.java | 9 +- .../llap/tezplugins/LlapTaskCommunicator.java | 10 +- .../hive/llap/tezplugins/TaskCommunicator.java | 15 +- .../dag/app/rm/LlapTaskSchedulerService.java | 36 ++-- .../hive/llap/daemon/MiniLlapCluster.java | 9 +- .../impl/TestLlapDaemonProtocolServerImpl.java | 8 +- .../app/rm/TestLlapTaskSchedulerService.java | 13 +- 19 files changed, 280 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 803d52b..d52f994 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2370,6 +2370,112 @@ public class HiveConf extends Configuration { LLAP_IO_THREADPOOL_SIZE("hive.llap.io.threadpool.size", 10, "Specify the number of threads to use for low-level IO thread pool."), + LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5, + "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"), + LLAP_DAEMON_WORK_DIRS("hive.llap.daemon.work.dirs", "", + "Working directories for the daemon. Needs to be set for a secure cluster, since LLAP may\n" + + "not have access to the default YARN working directories.", "llap.daemon.work.dirs"), + LLAP_DAEMON_YARN_SHUFFLE_PORT("hive.llap.daemon.yarn.shuffle.port", 15551, + "YARN shuffle port for LLAP-daemon-hosted shuffle.", "llap.daemon.yarn.shuffle.port"), + LLAP_DAEMON_YARN_CONTAINER_MB("hive.llap.daemon.yarn.container.mb", -1, + "TODO doc. Unused?", "llap.daemon.yarn.container.mb"), + LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED("hive.llap.daemon.shuffle.dir.watcher.enabled", false, + "TODO doc", "llap.daemon.shuffle.dir-watcher.enabled"), + LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS( + "hive.llap.daemon.am.liveness.heartbeat.interval.ms", "10000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Tez AM-LLAP heartbeat interval (milliseconds). This needs to be below the task timeout\n" + + "interval, but otherwise as high as possible to avoid unnecessary traffic.", + "llap.daemon.am.liveness.heartbeat.interval-ms"), + LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS( + "hive.llap.am.liveness.connection.timeout.ms", "10000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Amount of time to wait on connection failures to the AM from an LLAP daemon before\n" + + "considering the AM to be dead.", "llap.am.liveness.connection.timeout-millis"), + // Not used yet - since the Writable RPC engine does not support this policy. + LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS( + "hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Sleep duration while waiting to retry connection failures to the AM from the daemon for\n" + + "the general keep-alive thread (milliseconds).", + "llap.am.liveness.connection.sleep-between-retries-millis"), + LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4, + "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" + + "executed in parallel.", "llap.daemon.num.executors"), + LLAP_DAEMON_RPC_PORT("hive.llap.daemon.rpc.port", 15001, "The LLAP daemon RPC port.", + "llap.daemon.rpc.port"), + LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096, + "The total amount of memory to use for the executors inside LLAP (in megabytes).", + "llap.daemon.memory.per.instance.mb"), + LLAP_DAEMON_VCPUS_PER_INSTANCE("hive.llap.daemon.vcpus.per.instance", 4, + "The total number of vcpus to use for the executors inside LLAP.", + "llap.daemon.vcpus.per.instance"), + LLAP_DAEMON_NUM_FILE_CLEANER_THREADS("hive.llap.daemon.num.file.cleaner.threads", 1, + "Number of file cleaner threads in LLAP.", "llap.daemon.num.file.cleaner.threads"), + LLAP_FILE_CLEANUP_DELAY_SECONDS("hive.llap.file.cleanup.delay.seconds", "300s", + new TimeValidator(TimeUnit.SECONDS), + "How long to delay before cleaning up query files in LLAP (in seconds, for debugging).", + "llap.file.cleanup.delay-seconds"), + LLAP_DAEMON_SERVICE_HOSTS("hive.llap.daemon.service.hosts", "", + "Explicitly specified hosts to use for LLAP scheduling. Useful for testing. By default,\n" + + "YARN registry is used.", "llap.daemon.service.hosts"), + LLAP_DAEMON_SERVICE_REFRESH_INTERVAL("hive.llap.daemon.service.refresh.interval.sec", "60s", + new TimeValidator(TimeUnit.SECONDS), + "LLAP YARN registry service list refresh delay, in seconds.", + "llap.daemon.service.refresh.interval"), + LLAP_DAEMON_COMMUNICATOR_NUM_THREADS("hive.llap.daemon.communicator.num.threads", 10, + "Number of threads to use in LLAP task communicator in Tez AM.", + "llap.daemon.communicator.num.threads"), + LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS( + "hive.llap.task.scheduler.node.reenable.min.timeout.ms", "200ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Minimum time after which a previously disabled node will be re-enabled for scheduling,\n" + + "in milliseconds. This may be modified by an exponential back-off if failures persist.", + "llap.task.scheduler.node.re-enable.min.timeout.ms"), + LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MS( + "hive.llap.task.scheduler.node.reenable.max.timeout.ms", "10000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Maximum time after which a previously disabled node will be re-enabled for scheduling,\n" + + "in milliseconds. This may be modified by an exponential back-off if failures persist.", + "llap.task.scheduler.node.re-enable.max.timeout.ms"), + LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR( + "hive.llap.task.scheduler.node.disable.backoff.factor", 1.5f, + "Backoff factor on successive blacklists of a node due to some failures. Blacklist times\n" + + "start at the min timeout and go up to the max timeout based on this backoff factor.", + "llap.task.scheduler.node.disable.backoff.factor"), + LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE( + "hive.llap.task.scheduler.num.schedulable.tasks.per.node", 0, + "The number of tasks the AM TaskScheduler will try allocating per node. 0 indicates that\n" + + "this should be picked up from the Registry. -1 indicates unlimited capacity; positive\n" + + "values indicate a specific bound.", "llap.task.scheduler.num.schedulable.tasks.per.node"), + LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size", + 10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"), + LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME( + "hive.llap.daemon.wait.queue.comparator.class.name", + "org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator", + "The priority comparator to use for LLAP scheduler prioroty queue. The built-in options\n" + + "are org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator and\n" + + ".....FirstInFirstOutComparator", "llap.daemon.wait.queue.comparator.class.name"), + LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION( + "hive.llap.daemon.task.scheduler.enable.preemption", true, + "Whether non-finishable running tasks (e.g. a reducer waiting for inputs) should be\n" + + "preempted by finishable tasks inside LLAP scheduler.", + "llap.daemon.task.scheduler.enable.preemption"), + LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS( + "hive.llap.task.communicator.connection.timeout.ms", "16000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Connection timeout (in milliseconds) before a failure to an LLAP daemon from Tez AM.", + "llap.task.communicator.connection.timeout-millis"), + LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS( + "hive.llap.task.communicator.connection.sleep.between.retries.ms", "2000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Sleep duration (in milliseconds) to wait before retrying on error when obtaining a\n" + + "connection to LLAP daemon from Tez AM.", + "llap.task.communicator.connection.sleep-between-retries-millis"), + LLAP_DAEMON_WEB_PORT("hive.llap.daemon.web.port", 15002, "LLAP daemon web UI port.", + "llap.daemon.service.port"), + LLAP_DAEMON_WEB_SSL("hive.llap.daemon.web.ssl", false, + "Whether LLAP daemon web UI should use SSL.", "llap.daemon.service.ssl"), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), @@ -2422,6 +2528,7 @@ public class HiveConf extends Configuration { public final String varname; + private final String altName; private final String defaultExpr; public final String defaultStrVal; @@ -2441,28 +2548,39 @@ public class HiveConf extends Configuration { private final boolean caseSensitive; ConfVars(String varname, Object defaultVal, String description) { - this(varname, defaultVal, null, description, true, false); + this(varname, defaultVal, null, description, true, false, null); + } + + ConfVars(String varname, Object defaultVal, String description, String altName) { + this(varname, defaultVal, null, description, true, false, altName); + } + + ConfVars(String varname, Object defaultVal, Validator validator, String description, + String altName) { + this(varname, defaultVal, validator, description, true, false, altName); } ConfVars(String varname, Object defaultVal, String description, boolean excluded) { - this(varname, defaultVal, null, description, true, excluded); + this(varname, defaultVal, null, description, true, excluded, null); } ConfVars(String varname, String defaultVal, boolean caseSensitive, String description) { - this(varname, defaultVal, null, description, caseSensitive, false); + this(varname, defaultVal, null, description, caseSensitive, false, null); } ConfVars(String varname, Object defaultVal, Validator validator, String description) { - this(varname, defaultVal, validator, description, true, false); + this(varname, defaultVal, validator, description, true, false, null); } - ConfVars(String varname, Object defaultVal, Validator validator, String description, boolean caseSensitive, boolean excluded) { + ConfVars(String varname, Object defaultVal, Validator validator, String description, + boolean caseSensitive, boolean excluded, String altName) { this.varname = varname; this.validator = validator; this.description = description; this.defaultExpr = defaultVal == null ? null : String.valueOf(defaultVal); this.excluded = excluded; this.caseSensitive = caseSensitive; + this.altName = altName; if (defaultVal == null || defaultVal instanceof String) { this.valClass = String.class; this.valType = VarType.STRING; @@ -2704,6 +2822,9 @@ public class HiveConf extends Configuration { public static int getIntVar(Configuration conf, ConfVars var) { assert (var.valClass == Integer.class) : var.varname; + if (var.altName != null) { + return conf.getInt(var.varname, conf.getInt(var.altName, var.defaultIntVal)); + } return conf.getInt(var.varname, var.defaultIntVal); } @@ -2798,10 +2919,16 @@ public class HiveConf extends Configuration { public static long getLongVar(Configuration conf, ConfVars var) { assert (var.valClass == Long.class) : var.varname; + if (var.altName != null) { + return conf.getLong(var.varname, conf.getLong(var.altName, var.defaultLongVal)); + } return conf.getLong(var.varname, var.defaultLongVal); } public static long getLongVar(Configuration conf, ConfVars var, long defaultVal) { + if (var.altName != null) { + return conf.getLong(var.varname, conf.getLong(var.altName, defaultVal)); + } return conf.getLong(var.varname, defaultVal); } @@ -2820,10 +2947,16 @@ public class HiveConf extends Configuration { public static float getFloatVar(Configuration conf, ConfVars var) { assert (var.valClass == Float.class) : var.varname; + if (var.altName != null) { + return conf.getFloat(var.varname, conf.getFloat(var.altName, var.defaultFloatVal)); + } return conf.getFloat(var.varname, var.defaultFloatVal); } public static float getFloatVar(Configuration conf, ConfVars var, float defaultVal) { + if (var.altName != null) { + return conf.getFloat(var.varname, conf.getFloat(var.altName, defaultVal)); + } return conf.getFloat(var.varname, defaultVal); } @@ -2842,10 +2975,16 @@ public class HiveConf extends Configuration { public static boolean getBoolVar(Configuration conf, ConfVars var) { assert (var.valClass == Boolean.class) : var.varname; + if (var.altName != null) { + return conf.getBoolean(var.varname, conf.getBoolean(var.altName, var.defaultBoolVal)); + } return conf.getBoolean(var.varname, var.defaultBoolVal); } public static boolean getBoolVar(Configuration conf, ConfVars var, boolean defaultVal) { + if (var.altName != null) { + return conf.getBoolean(var.varname, conf.getBoolean(var.altName, defaultVal)); + } return conf.getBoolean(var.varname, defaultVal); } @@ -2864,10 +3003,24 @@ public class HiveConf extends Configuration { public static String getVar(Configuration conf, ConfVars var) { assert (var.valClass == String.class) : var.varname; + if (var.altName != null) { + return conf.get(var.varname, conf.get(var.altName, var.defaultStrVal)); + } return conf.get(var.varname, var.defaultStrVal); } + public static String getTrimmedVar(Configuration conf, ConfVars var) { + assert (var.valClass == String.class) : var.varname; + if (var.altName != null) { + return conf.getTrimmed(var.varname, conf.getTrimmed(var.altName, var.defaultStrVal)); + } + return conf.getTrimmed(var.varname, var.defaultStrVal); + } + public static String getVar(Configuration conf, ConfVars var, String defaultVal) { + if (var.altName != null) { + return conf.get(var.varname, conf.get(var.altName, defaultVal)); + } return conf.get(var.varname, defaultVal); } http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/data/conf/llap/llap-daemon-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/llap/llap-daemon-site.xml b/data/conf/llap/llap-daemon-site.xml index f2851a7..cc3e438 100644 --- a/data/conf/llap/llap-daemon-site.xml +++ b/data/conf/llap/llap-daemon-site.xml @@ -16,25 +16,24 @@ See the License for the specific language governing permissions and limitations under the License. --> - <configuration> <property> - <name>llap.daemon.service.hosts</name> + <name>hive.llap.daemon.service.hosts</name> <value>localhost</value> </property> <property> - <name>llap.daemon.service.port</name> + <name>hive.llap.daemon.service.port</name> <value>0</value> </property> <property> - <name>llap.daemon.num.executors</name> + <name>hive.llap.daemon.num.executors</name> <value>4</value> </property> <property> - <name>llap.daemon.task.scheduler.wait.queue.size</name> + <name>hive.llap.daemon.task.scheduler.wait.queue.size</name> <value>4</value> </property> http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java index bd09024..abdbc09 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java @@ -19,6 +19,8 @@ import java.net.URL; import org.apache.hadoop.conf.Configuration; public class LlapConfiguration extends Configuration { + public static final String LLAP_PREFIX = "llap."; + public static final String LLAP_DAEMON_PREFIX = "llap.daemon."; public LlapConfiguration(Configuration conf) { super(conf); @@ -35,142 +37,5 @@ public class LlapConfiguration extends Configuration { addResource(llapDaemonConfLocation); } - public static final String LLAP_PREFIX = "llap."; - - public static final String LLAP_DAEMON_PREFIX = "llap.daemon."; private static final String LLAP_DAEMON_SITE = "llap-daemon-site.xml"; - - - - public static final String LLAP_DAEMON_RPC_NUM_HANDLERS = LLAP_DAEMON_PREFIX + "rpc.num.handlers"; - public static final int LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT = 5; - - public static final String LLAP_DAEMON_WORK_DIRS = LLAP_DAEMON_PREFIX + "work.dirs"; - - public static final String LLAP_DAEMON_YARN_SHUFFLE_PORT = LLAP_DAEMON_PREFIX + "yarn.shuffle.port"; - public static final int LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT = 15551; - - public static final String LLAP_DAEMON_YARN_CONTAINER_MB = LLAP_DAEMON_PREFIX + "yarn.container.mb"; - public static final int LLAP_DAEMON_YARN_CONTAINER_MB_DEFAULT = -1; - - public static final String LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED = LLAP_DAEMON_PREFIX + "shuffle.dir-watcher.enabled"; - public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false; - - // This needs to be kept below the task timeout interval, but otherwise as high as possible to avoid unnecessary traffic. - public static final String LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS = LLAP_DAEMON_PREFIX + "am.liveness.heartbeat.interval-ms"; - public static final long LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT = 10000l; - - /** - * Amount of time to wait on connection failures to the AM from an LLAP daemon before considering - * the AM to be dead - */ - public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS = - LLAP_PREFIX + "am.liveness.connection.timeout-millis"; - public static final long LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 10000l; - - // Not used yet - since the Writable RPC engine does not support this policy. - /** - * Sleep duration while waiting to retry connection failures to the AM from the daemon for the - * general keep-alive thread - */ - public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS = - LLAP_PREFIX + "am.liveness.connection.sleep-between-retries-millis"; - public static final long LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT = - 2000l; - - - // Section for configs used in AM and executors - public static final String LLAP_DAEMON_NUM_EXECUTORS = LLAP_DAEMON_PREFIX + "num.executors"; - public static final int LLAP_DAEMON_NUM_EXECUTORS_DEFAULT = 4; - - public static final String LLAP_DAEMON_RPC_PORT = LLAP_DAEMON_PREFIX + "rpc.port"; - public static final int LLAP_DAEMON_RPC_PORT_DEFAULT = 15001; - - public static final String LLAP_DAEMON_MEMORY_PER_INSTANCE_MB = LLAP_DAEMON_PREFIX + "memory.per.instance.mb"; - public static final int LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT = 4096; - - public static final String LLAP_DAEMON_VCPUS_PER_INSTANCE = LLAP_DAEMON_PREFIX + "vcpus.per.instance"; - public static final int LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT = 4; - - public static final String LLAP_DAEMON_NUM_FILE_CLEANER_THREADS = LLAP_DAEMON_PREFIX + "num.file.cleaner.threads"; - public static final int LLAP_DAEMON_NUM_FILE_CLEANER_THREADS_DEFAULT = 1; - - - // Section for configs used in the AM // - public static final String LLAP_FILE_CLEANUP_DELAY_SECONDS = LLAP_PREFIX + "file.cleanup.delay-seconds"; - public static final long LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT = 300; // 5 minutes by default - - public static final String LLAP_DAEMON_SERVICE_HOSTS = LLAP_DAEMON_PREFIX + "service.hosts"; - - public static final String LLAP_DAEMON_SERVICE_REFRESH_INTERVAL = LLAP_DAEMON_PREFIX + "service.refresh.interval"; - public static final int LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT = 60; // seconds - - public static final String LLAP_DAEMON_COMMUNICATOR_NUM_THREADS = LLAP_DAEMON_PREFIX + "communicator.num.threads"; - public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 10; - - /** - * Minimum time after which a previously disabled node will be re-enabled for scheduling. This may - * be modified by an exponential back-off if failures persist - */ - public static final String LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS = - LLAP_PREFIX + "task.scheduler.node.re-enable.min.timeout.ms"; - public static final long LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS_DEFAULT = 200l; - - /** - * Maximum time after which a previously disabled node will be re-enabled for scheduling. This may - * be modified by an exponential back-off if failures persist - */ - public static final String LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS = - LLAP_PREFIX + "task.scheduler.node.re-enable.max.timeout.ms"; - public static final long LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS_DEFAULT = 10000l; - - /** - * Backoff factor on successive blacklists of a node. Blacklists timeouts start at the min timeout - * and go up to the max timeout based on this backoff factor - */ - public static final String LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR = - LLAP_PREFIX + "task.scheduler.node.disable.backoff.factor"; - public static final float LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR_DEFAULT = 1.5f; - - /** - * The number of tasks the AM TaskScheduler will try allocating per node. - * 0 indicates that this should be picked up from the Registry. - * -1 indicates unlimited capacity - * >0 indicates a specific bound - */ - public static final String LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE = - LLAP_PREFIX + "task.scheduler.num.schedulable.tasks.per.node"; - public static final int LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE_DEFAULT = 0; - - public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE = - LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size"; - public static final int LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT = 10; - - public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME = - LLAP_DAEMON_PREFIX + "wait.queue.comparator.class.name"; - public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT = - "org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator"; - - public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION = - LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption"; - public static final boolean LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT = true; - - - /** Amount of time to wait on a connection failure to an LLAP daemon */ - public static final String LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS = - LLAP_PREFIX + "task.communicator.connection.timeout-millis"; - public static final long LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 16000; - - /** Sleep duration while waiting for a connection failure */ - public static final String LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS = - LLAP_PREFIX + "task.communicator.connection.sleep-between-retries-millis"; - public static final long LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT = 2000l; - - - - public static final String LLAP_DAEMON_SERVICE_PORT = LLAP_DAEMON_PREFIX + "service.port"; - public static final int LLAP_DAEMON_SERVICE_PORT_DEFAULT = 15002; - - public static final String LLAP_DAEMON_SERVICE_SSL = LLAP_DAEMON_PREFIX + "service.ssl"; - public static final boolean LLAP_DAEMON_SERVICE_SSL_DEFAULT = false; } http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index 34e0682..a085427 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -24,6 +24,8 @@ import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; @@ -53,12 +55,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { public LlapFixedRegistryImpl(String hosts, Configuration conf) { this.hosts = hosts.split(","); - this.port = - conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - this.shuffle = - conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, - LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT); + this.shuffle = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT); this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true); for (Map.Entry<String, String> kv : conf) { @@ -70,12 +68,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { } } - this.memory = - conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, - LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); - this.vcores = - conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, - LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + this.memory = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB); + this.vcores = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index a8e1465..740f373 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -16,6 +16,8 @@ package org.apache.hadoop.hive.llap.registry.impl; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; @@ -37,7 +39,7 @@ public class LlapRegistryService extends AbstractService { @Override public void serviceInit(Configuration conf) { - String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); if (hosts.startsWith("@")) { registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java index d474b6f..2673ad7 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java @@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; @@ -97,37 +99,28 @@ public class LlapYarnRegistryImpl implements ServiceRegistry { encoder = new RegistryUtils.ServiceRecordMarshal(); this.path = RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), SERVICE_CLASS, instanceName, "workers"), "worker-"); - refreshDelay = - conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, - LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT); + refreshDelay = HiveConf.getTimeVar( + conf, ConfVars.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, TimeUnit.SECONDS); this.isDaemon = isDaemon; Preconditions.checkArgument(refreshDelay > 0, "Refresh delay for registry has to be positive = %d", refreshDelay); } public Endpoint getRpcEndpoint() { - final int rpcPort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + final int rpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT); return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort)); } public Endpoint getShuffleEndpoint() { - final int shufflePort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, - LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + final int shufflePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT); // HTTP today, but might not be return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname, shufflePort); } public Endpoint getServicesEndpoint() { - final int servicePort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT, - LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT); - final boolean isSSL = - conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL, - LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT); + final int servicePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); + final boolean isSSL = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL); final String scheme = isSSL ? "https" : "http"; final URL serviceURL; try { @@ -238,8 +231,8 @@ public class LlapYarnRegistryImpl implements ServiceRegistry { @Override public Resource getResource() { - int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); - int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS)); + int memory = Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname)); + int vCores = Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); return Resource.newInstance(memory, vCores); } http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index ac7e20c..08d573b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -31,8 +31,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.CompressionUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveInputFormat; @@ -128,7 +128,7 @@ public class LlapServiceDriver { // as read by the AM // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between // instances - conf.set(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, "@" + options.getName()); + conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName()); } if (options.getSize() != -1) { @@ -151,11 +151,11 @@ public class LlapServiceDriver { final long containerSize = options.getSize() / (1024 * 1024); Preconditions.checkArgument(containerSize >= minAlloc, "Container size should be greater than minimum allocation(%s)", minAlloc + "m"); - conf.setLong(LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, containerSize); + conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize); } if (options.getExecutors() != -1) { - conf.setLong(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, options.getExecutors()); + conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors()); // TODO: vcpu settings - possibly when DRFA works right } @@ -167,7 +167,7 @@ public class LlapServiceDriver { // Needs more explanation here // Xmx is not the max heap value in JDK8 // You need to subtract 50% of the survivor fraction from this, to get actual usable memory before it goes into GC - conf.setLong(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, (long)(options.getXmx()) + conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, (long)(options.getXmx()) / (1024 * 1024)); } @@ -236,9 +236,8 @@ public class LlapServiceDriver { // extract configs for processing by the python fragments in Slider JSONObject configs = new JSONObject(); - configs.put(LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, conf.getInt( - LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, - LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB_DEFAULT)); + configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, HiveConf.getIntVar(conf, + ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB)); configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE)); @@ -246,17 +245,14 @@ public class LlapServiceDriver { configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT.varname, HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT)); - configs.put(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, conf.getInt( - LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, - LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT)); + configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf, + ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); - configs.put(LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, conf.getInt( - LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, - LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT)); + configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, HiveConf.getIntVar(conf, + ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE)); - configs.put(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, conf.getInt( - LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, - LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT)); + configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, HiveConf.getIntVar(conf, + ConfVars.LLAP_DAEMON_NUM_EXECUTORS)); configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1)); http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 6d54fd4..f6711d8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -39,6 +39,8 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; @@ -113,16 +115,14 @@ public class AMReporter extends AbstractService { ExecutorService rawExecutor2 = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build()); this.queueLookupExecutor = MoreExecutors.listeningDecorator(rawExecutor2); - this.heartbeatInterval = - conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS, - LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT); - - this.retryTimeout = - conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS, - LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT); - long retrySleep = conf.getLong( - LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS, - LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT); + this.heartbeatInterval = HiveConf.getTimeVar( + conf, ConfVars.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS); + + this.retryTimeout = HiveConf.getTimeVar( + conf, ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + long retrySleep = HiveConf.getTimeVar( + conf, ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, + TimeUnit.MILLISECONDS); this.retryPolicy = RetryPolicies .retryUpToMaximumTimeWithFixedSleep(retryTimeout, retrySleep, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 4b28b53..2139bb0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -98,9 +99,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu this.queryTracker = new QueryTracker(conf, localDirsBase); addIfService(queryTracker); - String waitQueueSchedulerClassName = - conf.get(LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME, - LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT); + String waitQueueSchedulerClassName = HiveConf.getVar( + conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, waitQueueSchedulerClassName, enablePreemption); http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 98b1ccd..dbdf571 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -28,6 +28,7 @@ import javax.management.ObjectName; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; @@ -104,12 +105,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla this.numExecutors = numExecutors; this.localDirs = localDirs; - int waitQueueSize = daemonConf.getInt( - LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE, - LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT); - boolean enablePreemption = daemonConf.getBoolean( - LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION, - LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT); + int waitQueueSize = HiveConf.getIntVar( + daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE); + boolean enablePreemption = HiveConf.getBoolVar( + daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION); LOG.info("Attempting to start LlapDaemonConf with the following configuration: " + "numExecutors=" + numExecutors + ", rpcListenerPort=" + rpcPort + @@ -135,13 +134,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, shufflePort); this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, StringUtils.arrayToString(localDirs)); - this.shuffleHandlerConf.setBoolean(ShuffleHandler.SHUFFLE_DIR_WATCHER_ENABLED, daemonConf - .getBoolean(LlapConfiguration.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED, - LlapConfiguration.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT)); + this.shuffleHandlerConf.setBoolean(ShuffleHandler.SHUFFLE_DIR_WATCHER_ENABLED, + HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED)); // Less frequently set parameter, not passing in as a param. - int numHandlers = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS, - LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT); + int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS); // Initialize the metrics system LlapMetricsSystem.initialize("LlapDaemon"); @@ -275,18 +272,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla // Cache settings will need to be setup in llap-daemon-site.xml - since the daemons don't read hive-site.xml // Ideally, these properties should be part of LlapDameonConf rather than HiveConf LlapConfiguration daemonConf = new LlapConfiguration(); - int numExecutors = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, - LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); - String[] localDirs = - daemonConf.getTrimmedStrings(LlapConfiguration.LLAP_DAEMON_WORK_DIRS); - int rpcPort = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + String[] localDirs = daemonConf.getTrimmedStrings(ConfVars.LLAP_DAEMON_WORK_DIRS.varname); + int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT); int shufflePort = daemonConf .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); - long executorMemoryBytes = daemonConf - .getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, - LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 1024l; + long executorMemoryBytes = HiveConf.getIntVar( + daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; long cacheMemoryBytes = HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE); boolean isDirectCache = http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java index bc18a77..def1f9b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java @@ -30,6 +30,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.service.AbstractService; import org.apache.tez.common.CallableWithNdc; @@ -46,8 +48,8 @@ public class QueryFileCleaner extends AbstractService { public QueryFileCleaner(Configuration conf, FileSystem localFs) { super(QueryFileCleaner.class.getName()); - int numCleanerThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS, - LlapConfiguration.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS_DEFAULT); + int numCleanerThreads = HiveConf.getIntVar( + conf, ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS); ScheduledExecutorService rawExecutor = Executors.newScheduledThreadPool(numCleanerThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryFileCleaner %d").build()); this.executorService = MoreExecutors.listeningDecorator(rawExecutor); http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 2db2833..33d5671 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -16,6 +16,8 @@ package org.apache.hadoop.hive.llap.daemon.impl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; @@ -31,6 +33,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; @@ -83,8 +86,8 @@ public class QueryTracker extends CompositeService { throw new RuntimeException("Failed to setup local filesystem instance", e); } - this.defaultDeleteDelaySeconds = conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS, - LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT); + this.defaultDeleteDelaySeconds = HiveConf.getTimeVar( + conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS); queryFileCleaner = new QueryFileCleaner(conf, localFs); addService(queryFileCleaner); http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index 37910be..7856663 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -18,7 +18,8 @@ package org.apache.hadoop.hive.llap.daemon.services.impl; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.webapp.WebApp; @@ -43,10 +44,8 @@ public class LlapWebServices extends AbstractService { this.conf = new Configuration(conf); this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - this.port = conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT, - LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT); - this.ssl = conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL, - LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT); + this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); + this.ssl = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL); this.webAppInstance = new LlapWebApp(); } http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index d327fc0..b93650d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.collect.BiMap; @@ -29,6 +30,8 @@ import com.google.common.collect.HashBiMap; import com.google.protobuf.ByteString; import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; @@ -113,11 +116,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { public void initialize() throws Exception { super.initialize(); Configuration conf = getConf(); - int numThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS, - LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT); + int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS); this.communicator = new TaskCommunicator(numThreads, conf); - this.deleteDelayOnDagComplete = conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS, - LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT); + this.deleteDelayOnDagComplete = HiveConf.getTimeVar( + conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS); LOG.info("Running LlapTaskCommunicator with " + "fileCleanupDelay=" + deleteDelayOnDagComplete + ", numCommunicatorThreads=" + numThreads); http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java index 33e998c..8144165 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java @@ -41,6 +41,8 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.Message; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; @@ -79,14 +81,13 @@ public class TaskCommunicator extends AbstractService { this.hostProxies = new ConcurrentHashMap<>(); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); - long connectionTimeout = - conf.getLong(LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS, - LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS_DEFAULT); - long retrySleep = conf.getLong( - LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS, - LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT); - this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(connectionTimeout, retrySleep, + long connectionTimeout = HiveConf.getTimeVar(conf, + ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + long retrySleep = HiveConf.getTimeVar(conf, + ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, TimeUnit.MILLISECONDS); + this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( + connectionTimeout, retrySleep, TimeUnit.MILLISECONDS); this.requestManager = new RequestManager(numThreads); ExecutorService localExecutor = Executors.newFixedThreadPool(1, http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index e920f86..9821117 100644 --- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -50,6 +50,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; @@ -170,34 +172,26 @@ public class LlapTaskSchedulerService extends TaskScheduler { } this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(), taskSchedulerContext.getCustomClusterIdentifier()); - this.memoryPerInstance = - conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, - LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); - this.coresPerInstance = - conf.getInt(LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, - LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT); - this.executorsPerInstance = - conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, - LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB); + this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE); + this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); this.nodeBlacklistConf = new NodeBlacklistConf( - conf.getLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS, - LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS_DEFAULT), - conf.getLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS, - LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MILLIS_DEFAULT), - conf.getFloat(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR, - LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR_DEFAULT)); + HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS, + TimeUnit.MILLISECONDS), + HiveConf.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MAX_TIMEOUT_MS, + TimeUnit.MILLISECONDS), + HiveConf.getFloatVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NODE_DISABLE_BACK_OFF_FACTOR)); - this.numSchedulableTasksPerNode = conf.getInt( - LlapConfiguration.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE, - LlapConfiguration.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE_DEFAULT); + this.numSchedulableTasksPerNode = + HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE); int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); - String instanceId = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + String instanceId = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); - Preconditions.checkNotNull(instanceId, LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + Preconditions.checkNotNull(instanceId, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname + " must be defined"); ExecutorService executorServiceRaw = @@ -999,7 +993,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { if (numSchedulableTasksConf == 0) { int pendingQueueuCapacity = 0; String pendingQueueCapacityString = serviceInstance.getProperties() - .get(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE); + .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname); if (LOG.isDebugEnabled()) { LOG.debug("Setting up node: " + serviceInstance + ", with available capacity=" + serviceInstance.getResource().getVirtualCores() + ", pendingQueueCapacity=" + http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index 4525ab9..52ba360 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; import org.apache.hadoop.service.AbstractService; @@ -150,16 +151,16 @@ public class MiniLlapCluster extends AbstractService { public void serviceStart() { llapDaemon.start(); - clusterSpecificConfiguration.set(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, + clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, getServiceAddress().getHostName()); - clusterSpecificConfiguration.setInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, + clusterSpecificConfiguration.setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname, getServiceAddress().getPort()); clusterSpecificConfiguration.setInt( - LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutorsPerService); clusterSpecificConfiguration.setLong( - LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, execBytesPerService); + ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, execBytesPerService); // Optimize local fetch does not work with LLAP due to different local directories // used by containers and LLAP clusterSpecificConfiguration http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java index 8d45c95..bf8a673 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; @@ -33,10 +35,8 @@ public class TestLlapDaemonProtocolServerImpl { @Test(timeout = 10000) public void test() throws ServiceException { LlapConfiguration daemonConf = new LlapConfiguration(); - int rpcPort = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - int numHandlers = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS, - LlapConfiguration.LLAP_DAEMON_RPC_NUM_HANDLERS_DEFAULT); + int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT); + int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS); LlapDaemonProtocolServerImpl server = new LlapDaemonProtocolServerImpl(numHandlers, mock(ContainerRunner.class), new AtomicReference<InetSocketAddress>(), rpcPort); http://git-wip-us.apache.org/repos/asf/hive/blob/6cc5761b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index 23724a4..4eccc06 100644 --- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -33,6 +33,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -323,17 +324,17 @@ public class TestLlapTaskSchedulerService { TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException, InterruptedException { this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4, - LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT); + ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.defaultIntVal); } TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws IOException, InterruptedException { conf = new Configuration(); - conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts); - conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors); - conf.setInt(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE, waitQueueSize); - conf.setLong(LlapConfiguration.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MILLIS, - disableTimeoutMillis); + conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts); + conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors); + conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize); + conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname, + disableTimeoutMillis + "ms"); conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false); doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();