Repository: tajo Updated Branches: refs/heads/master d99bd085e -> 743c52650
TAJO-743: Change the default resource allocation policy of leaf tasks. (jinho) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/743c5265 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/743c5265 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/743c5265 Branch: refs/heads/master Commit: 743c52650d34ed67b777705816a127b7b5f1b0fa Parents: d99bd08 Author: jinossy <[email protected]> Authored: Thu Apr 10 15:03:10 2014 +0900 Committer: jinossy <[email protected]> Committed: Thu Apr 10 15:03:10 2014 +0900 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../java/org/apache/tajo/conf/TajoConf.java | 3 +- .../tajo/worker/TajoResourceAllocator.java | 4 ++- .../tajo/worker/WorkerHeartbeatService.java | 9 ++++++ .../org/apache/tajo/storage/v2/DiskUtil.java | 32 ++++++++++++-------- 5 files changed, 36 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/743c5265/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fe2349f..9353512 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -143,6 +143,8 @@ Release 0.8.0 - unreleased IMPROVEMENTS + TAJO-743: Change the default resource allocation policy of leaf tasks. (jinho) + TAJO-717: Improve file splitting for large number of splits. (jinho) TAJO-356: Improve TajoClient to directly get query results in the first request. http://git-wip-us.apache.org/repos/asf/tajo/blob/743c5265/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index db96eb8..3c81ed5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -105,6 +105,7 @@ public class TajoConf extends Configuration { WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024), WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f), WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2), + WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false), // Tajo Worker Dedicated Resources WORKER_RESOURCE_DEDICATED("tajo.worker.resource.dedicated", false), @@ -231,7 +232,7 @@ public class TajoConf extends Configuration { ////////////////////////////////// // Task Configuration TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512), - TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f), + TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 0.5f), TASK_DEFAULT_SIZE("tajo.task.size-mb", 128), ////////////////////////////////// http://git-wip-us.apache.org/repos/asf/tajo/blob/743c5265/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index bcf10dd..79f14a2 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -93,6 +93,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { int numTasks, int memoryMBPerTask) { //TODO consider disk slot + TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource(); int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask; clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot @@ -226,7 +227,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { .setMinMemoryMBPerContainer(requiredMemoryMB) .setMaxMemoryMBPerContainer(requiredMemoryMB) .setNumContainers(event.getRequiredNum()) - .setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY) + .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY + : TajoMasterProtocol.ResourceRequestPriority.DISK) .setMinDiskSlotPerContainer(requiredDiskSlots) .setMaxDiskSlotPerContainer(requiredDiskSlots) .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) http://git-wip-us.apache.org/repos/asf/tajo/blob/743c5265/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index 3843b8f..007bcbf 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -55,6 +55,11 @@ public class WorkerHeartbeatService extends AbstractService { private TajoConf systemConf; private RpcConnectionPool connectionPool; private WorkerHeartbeatThread thread; + private static final float HDFS_DATANODE_STORAGE_SIZE; + + static { + HDFS_DATANODE_STORAGE_SIZE = DiskUtil.getDataNodeStorageSize(); + } public WorkerHeartbeatService(TajoWorker.WorkerContext context) { super(WorkerHeartbeatService.class.getSimpleName()); @@ -117,6 +122,10 @@ public class WorkerHeartbeatService extends AbstractService { workerMemoryMB = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB); workerCpuCoreNum = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); workerDiskSlots = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS); + + if (systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && HDFS_DATANODE_STORAGE_SIZE > 0) { + workerDiskSlots = HDFS_DATANODE_STORAGE_SIZE; + } } systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder() http://git-wip-us.apache.org/repos/asf/tajo/blob/743c5265/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java index bb90c39..d5873bb 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java @@ -18,17 +18,15 @@ package org.apache.tajo.storage.v2; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.common.Util; + +import java.io.*; +import java.net.URI; +import java.util.*; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; public class DiskUtil { @@ -189,7 +187,17 @@ public class DiskUtil { } } } - + + public static int getDataNodeStorageSize(){ + return getStorageDirs().size(); + } + + public static List<URI> getStorageDirs(){ + Configuration conf = new HdfsConfiguration(); + Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); + return Util.stringCollectionAsURIs(dirNames); + } + public static void main(String[] args) throws Exception { System.out.println("/dev/sde1".split("/").length); for(String eachToken: "/dev/sde1".split("/")) {
