Repository: tajo
Updated Branches:
  refs/heads/master d99bd085e -> 743c52650


TAJO-743: Change the default resource allocation policy of leaf tasks. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/743c5265
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/743c5265
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/743c5265

Branch: refs/heads/master
Commit: 743c52650d34ed67b777705816a127b7b5f1b0fa
Parents: d99bd08
Author: jinossy <[email protected]>
Authored: Thu Apr 10 15:03:10 2014 +0900
Committer: jinossy <[email protected]>
Committed: Thu Apr 10 15:03:10 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../java/org/apache/tajo/conf/TajoConf.java     |  3 +-
 .../tajo/worker/TajoResourceAllocator.java      |  4 ++-
 .../tajo/worker/WorkerHeartbeatService.java     |  9 ++++++
 .../org/apache/tajo/storage/v2/DiskUtil.java    | 32 ++++++++++++--------
 5 files changed, 36 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/743c5265/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fe2349f..9353512 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -143,6 +143,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-743: Change the default resource allocation policy of leaf tasks. 
(jinho)
+
     TAJO-717: Improve file splitting for large number of splits. (jinho)
 
     TAJO-356: Improve TajoClient to directly get query results in the first 
request.

http://git-wip-us.apache.org/repos/asf/tajo/blob/743c5265/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java 
b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index db96eb8..3c81ed5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -105,6 +105,7 @@ public class TajoConf extends Configuration {
     WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 
1024),
     WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f),
     WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2),
+    WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false),
 
     // Tajo Worker Dedicated Resources
     WORKER_RESOURCE_DEDICATED("tajo.worker.resource.dedicated", false),
@@ -231,7 +232,7 @@ public class TajoConf extends Configuration {
     //////////////////////////////////
     // Task Configuration
     TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),
-    TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f),
+    TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 0.5f),
     TASK_DEFAULT_SIZE("tajo.task.size-mb", 128),
     //////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/743c5265/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
 
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index bcf10dd..79f14a2 100644
--- 
a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ 
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -93,6 +93,7 @@ public class TajoResourceAllocator extends 
AbstractResourceAllocator {
                                            int numTasks,
                                            int memoryMBPerTask) {
     //TODO consider disk slot
+
     TajoMasterProtocol.ClusterResourceSummary clusterResource = 
workerContext.getClusterResource();
     int clusterSlots = clusterResource == null ? 0 : 
clusterResource.getTotalMemoryMB() / memoryMBPerTask;
     clusterSlots =  Math.max(1, clusterSlots - 1); // reserve query master slot
@@ -226,7 +227,8 @@ public class TajoResourceAllocator extends 
AbstractResourceAllocator {
               .setMinMemoryMBPerContainer(requiredMemoryMB)
               .setMaxMemoryMBPerContainer(requiredMemoryMB)
               .setNumContainers(event.getRequiredNum())
-              
.setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY)
+              .setResourceRequestPriority(!event.isLeafQuery() ? 
TajoMasterProtocol.ResourceRequestPriority.MEMORY
+                  : TajoMasterProtocol.ResourceRequestPriority.DISK)
               .setMinDiskSlotPerContainer(requiredDiskSlots)
               .setMaxDiskSlotPerContainer(requiredDiskSlots)
               .setQueryId(event.getExecutionBlockId().getQueryId().getProto())

http://git-wip-us.apache.org/repos/asf/tajo/blob/743c5265/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
 
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index 3843b8f..007bcbf 100644
--- 
a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ 
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -55,6 +55,11 @@ public class WorkerHeartbeatService extends AbstractService {
   private TajoConf systemConf;
   private RpcConnectionPool connectionPool;
   private WorkerHeartbeatThread thread;
+  private static final float HDFS_DATANODE_STORAGE_SIZE;
+
+  static {
+    HDFS_DATANODE_STORAGE_SIZE = DiskUtil.getDataNodeStorageSize();
+  }
 
   public WorkerHeartbeatService(TajoWorker.WorkerContext context) {
     super(WorkerHeartbeatService.class.getSimpleName());
@@ -117,6 +122,10 @@ public class WorkerHeartbeatService extends 
AbstractService {
         workerMemoryMB = 
systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
         workerCpuCoreNum = 
systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
         workerDiskSlots = 
systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
+
+        if 
(systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && 
HDFS_DATANODE_STORAGE_SIZE > 0) {
+          workerDiskSlots = HDFS_DATANODE_STORAGE_SIZE;
+        }
       }
 
       systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()

http://git-wip-us.apache.org/repos/asf/tajo/blob/743c5265/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
index bb90c39..d5873bb 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
@@ -18,17 +18,15 @@
 
 package org.apache.tajo.storage.v2;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.common.Util;
+
+import java.io.*;
+import java.net.URI;
+import java.util.*;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 
 public class DiskUtil {
 
@@ -189,7 +187,17 @@ public class DiskUtil {
                        }
                }
        }
-       
+
+  public static int getDataNodeStorageSize(){
+    return getStorageDirs().size();
+  }
+
+  public static List<URI> getStorageDirs(){
+    Configuration conf = new HdfsConfiguration();
+    Collection<String> dirNames = 
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
+    return Util.stringCollectionAsURIs(dirNames);
+  }
+
        public static void main(String[] args) throws Exception {
                System.out.println("/dev/sde1".split("/").length);
                for(String eachToken: "/dev/sde1".split("/")) {

Reply via email to