Updated Branches:
  refs/heads/master 6cf96bd4e -> fef3dd509

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java
deleted file mode 100644
index 4abeefb..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.common.collect.Lists;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-public class SchedulerUtils {
-
-  public static class MapComparatorBySize implements Comparator<Map>{
-
-    @Override
-    public int compare(Map m1, Map m2) {
-      return m1.size() - m2.size();
-    }
-  }
-
-  public static List<Map> sortListOfMapsBySize(
-      final List<Map> maplist) {
-    Map[] arr = new Map[maplist.size()];
-    arr = maplist.toArray(arr);
-    Arrays.sort(arr, new MapComparatorBySize());
-
-    List<Map> newlist = Lists.newArrayList(arr);
-    return newlist;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
index 754f267..ff73334 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
@@ -22,14 +22,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.*;
+import tajo.QueryIdFactory;
 import tajo.QueryUnitId;
 import tajo.SubQueryId;
 import tajo.catalog.*;
@@ -43,6 +44,7 @@ import tajo.engine.planner.PlannerUtil;
 import tajo.engine.planner.logical.*;
 import tajo.master.QueryMaster.QueryContext;
 import tajo.master.event.*;
+import tajo.storage.Fragment;
 import tajo.storage.StorageManager;
 import tajo.util.IndexUtil;
 
@@ -57,38 +59,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static tajo.conf.TajoConf.ConfVars;
 
 
+/**
+ * SubQuery is an instance of an ExecutionBlock.
+ */
 public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private static final Log LOG = LogFactory.getLog(SubQuery.class);
 
-  public enum PARTITION_TYPE {
-    /** for hash partitioning */
-    HASH,
-    LIST,
-    /** for map-side join */
-    BROADCAST,
-    /** for range partitioning */
-    RANGE
-  }
-
-  private SubQueryId id;
-  private LogicalNode plan = null;
-  private StoreTableNode store = null;
-  private List<ScanNode> scanlist = null;
-  private SubQuery next;
-  private Map<ScanNode, SubQuery> childSubQueries;
-  private PARTITION_TYPE outputType;
-  private boolean hasJoinPlan;
-  private boolean hasUnionPlan;
+  private ExecutionBlock block;
   private Priority priority;
   private TableStat stats;
   EventHandler eventHandler;
   final StorageManager sm;
-  private final GlobalPlanner planner;
-  private boolean isLeafQuery = false;
   TaskSchedulerImpl taskScheduler;
   QueryContext queryContext;
-  private Clock clock;
 
   private long startTime;
   private long finishTime;
@@ -106,7 +90,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> 
{
       new StateMachineFactory <SubQuery, SubQueryState,
           SubQueryEventType, SubQueryEvent> (SubQueryState.NEW)
 
-          .addTransition(SubQueryState.NEW, EnumSet.of(SubQueryState.INIT, 
SubQueryState.FAILED, SubQueryState.SUCCEEDED),
+          .addTransition(SubQueryState.NEW,
+              EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, 
SubQueryState.SUCCEEDED),
               SubQueryEventType.SQ_INIT, new InitAndRequestContainer())
 
           .addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED,
@@ -148,20 +133,15 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
 
   private int completedTaskCount = 0;
 
-  public SubQuery(SubQueryId id, StorageManager sm, GlobalPlanner planner) {
-    this.id = id;
-    childSubQueries = new HashMap<ScanNode, SubQuery>();
-    scanlist = new ArrayList<ScanNode>();
-    hasJoinPlan = false;
-    hasUnionPlan = false;
+  public SubQuery(QueryContext context, ExecutionBlock block, StorageManager 
sm) {
+    this.queryContext = context;
+    this.block = block;
     this.sm = sm;
-    this.planner = planner;
+    this.eventHandler = context.getEventHandler();
 
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();
-
-
     stateMachine = stateMachineFactory.make(this);
   }
 
@@ -190,72 +170,13 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     }
   }
 
-  public void setQueryContext(QueryContext context) {
-    this.queryContext = context;
-  }
-
-  public void setClock(Clock clock) {
-    this.clock = clock;
-  }
-
-  public void setEventHandler(EventHandler eventHandler) {
-    this.eventHandler = eventHandler;
-  }
-
-  public boolean isLeafQuery() {
-    return this.isLeafQuery;
-  }
-
-  public void setLeafQuery() {
-    this.isLeafQuery = true;
+  public ExecutionBlock getBlock() {
+    return block;
   }
 
   public void addTask(QueryUnit task) {
     tasks.put(task.getId(), task);
   }
-  
-  public void setOutputType(PARTITION_TYPE type) {
-    this.outputType = type;
-  }
-
-  public GlobalPlanner getPlanner() {
-    return planner;
-  }
-  
-  public void setLogicalPlan(LogicalNode plan) {
-    hasJoinPlan = false;
-    Preconditions.checkArgument(plan.getType() == ExprType.STORE
-        || plan.getType() == ExprType.CREATE_INDEX);
-
-    this.plan = plan;
-    if (plan instanceof StoreTableNode) {
-      store = (StoreTableNode) plan;      
-    } else {
-      store = (StoreTableNode) ((IndexWriteNode)plan).getSubNode();
-    }
-    
-    LogicalNode node = plan;
-    ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
-    s.add(node);
-    while (!s.isEmpty()) {
-      node = s.remove(s.size()-1);
-      if (node instanceof UnaryNode) {
-        UnaryNode unary = (UnaryNode) node;
-        s.add(s.size(), unary.getSubNode());
-      } else if (node instanceof BinaryNode) {
-        BinaryNode binary = (BinaryNode) node;
-        if (binary.getType() == ExprType.JOIN) {
-          hasJoinPlan = true;
-        } else if (binary.getType() == ExprType.UNION) {
-          hasUnionPlan = true;
-        }
-        s.add(s.size(), binary.getOuterNode());
-        s.add(s.size(), binary.getInnerNode());
-      } else if (node instanceof ScanNode) {
-        scanlist.add((ScanNode)node);
-      }
-    }
-  }
 
   public void abortSubQuery(SubQueryState finalState) {
     // TODO -
@@ -271,37 +192,6 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     return this.stateMachine;
   }
 
-  public boolean hasJoinPlan() {
-    return this.hasJoinPlan;
-  }
-
-  public boolean hasUnionPlan() {
-    return this.hasUnionPlan;
-  }
-  
-  public void setParentQuery(SubQuery next) {
-    this.next = next;
-  }
-  
-  public void addChildQuery(ScanNode prevscan, SubQuery prev) {
-    childSubQueries.put(prevscan, prev);
-  }
-  
-  public void addChildQueries(Map<ScanNode, SubQuery> prevs) {
-    this.childSubQueries.putAll(prevs);
-  }
-  
-  public void setQueryUnits(List<QueryUnit> queryUnits) {
-    for (QueryUnit task: queryUnits) {
-      tasks.put(task.getId(), task);
-    }
-  }
-  
-  public void removeChildQuery(ScanNode scan) {
-    scanlist.remove(scan);
-    this.childSubQueries.remove(scan);
-  }
-
   public void setPriority(int priority) {
     if (this.priority == null) {
       this.priority = new Priority(priority);
@@ -316,56 +206,12 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     this.stats = stat;
   }
   
-  public SubQuery getParentQuery() {
-    return this.next;
-  }
-  
-  public boolean hasChildQuery() {
-    return !this.childSubQueries.isEmpty();
-  }
-  
-  public Iterator<SubQuery> getChildIterator() {
-    return this.childSubQueries.values().iterator();
-  }
-  
-  public Collection<SubQuery> getChildQueries() {
-    return this.childSubQueries.values();
-  }
-  
-  public Map<ScanNode, SubQuery> getChildMaps() {
-    return this.childSubQueries;
-  }
-  
   public SubQuery getChildQuery(ScanNode scanForChild) {
-    return this.childSubQueries.get(scanForChild);
-  }
-  
-  public String getOutputName() {
-    return this.store.getTableName();
-  }
-  
-  public PARTITION_TYPE getOutputType() {
-    return this.outputType;
-  }
-  
-  public Schema getOutputSchema() {
-    return this.store.getOutSchema();
-  }
-  
-  public StoreTableNode getStoreTableNode() {
-    return this.store;
-  }
-  
-  public ScanNode[] getScanNodes() {
-    return this.scanlist.toArray(new ScanNode[scanlist.size()]);
-  }
-  
-  public LogicalNode getLogicalPlan() {
-    return this.plan;
+    return queryContext.getSubQuery(block.getChildBlock(scanForChild).getId());
   }
   
   public SubQueryId getId() {
-    return this.id;
+    return block.getId();
   }
   
   public QueryUnit[] getQueryUnits() {
@@ -387,13 +233,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
 
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append(this.id);
-/*    sb.append(" plan: " + plan.toString());
-    sb.append("next: " + next + " childSubQueries:");
-    Iterator<SubQuery> it = getChildIterator();
-    while (it.hasNext()) {
-      sb.append(" " + it.next());
-    }*/
+    sb.append(this.getId());
     return sb.toString();
   }
   
@@ -401,18 +241,18 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
   public boolean equals(Object o) {
     if (o instanceof SubQuery) {
       SubQuery other = (SubQuery)o;
-      return this.id.equals(other.getId());
+      return getId().equals(other.getId());
     }
     return false;
   }
   
   @Override
   public int hashCode() {
-    return this.id.hashCode();
+    return getId().hashCode();
   }
   
   public int compareTo(SubQuery other) {
-    return this.id.compareTo(other.id);
+    return getId().compareTo(other.getId());
   }
 
   public SubQueryState getState() {
@@ -439,8 +279,11 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     int numBlocks = 0, numPartitions = 0;
     List<ColumnStat> columnStats = Lists.newArrayList();
 
-    for (SubQuery child : unit.getChildQueries()) {
-      childStat = child.getStats();
+    Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator();
+    while (it.hasNext()) {
+      ExecutionBlock block = it.next();
+      SubQuery childSubQuery = unit.queryContext.getSubQuery(block.getId());
+      childStat = childSubQuery.getStats();
       avgRows += childStat.getAvgRows();
       columnStats.addAll(childStat.getColumnStats());
       numBlocks += childStat.getNumBlocks();
@@ -448,6 +291,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       numPartitions += childStat.getNumPartitions();
       numRows += childStat.getNumRows();
     }
+
     stat.setColumnStats(columnStats);
     stat.setNumBlocks(numBlocks);
     stat.setNumBytes(numBytes);
@@ -458,7 +302,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
   }
 
   public void cleanUp() {
-    if (hasUnionPlan()) {
+    if (block.hasUnion()) {
       try {
         // write meta and continue
         TableStat stat = generateUnionStat(this);
@@ -469,35 +313,38 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
         e.printStackTrace();
       }
     } else {
+      LOG.info("SubQuery: " + getId() + " sets TableStat");
       TableStat stat = generateStat();
-      setStats(stat);
       try {
         writeStat(this, stat);
       } catch (IOException e) {
       }
     }
 
-    finishTime = clock.getTime();
+    finishTime = queryContext.getClock().getTime();
   }
 
 
-  private static class InitAndRequestContainer implements 
MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
+  private static class InitAndRequestContainer implements 
MultipleArcTransition<SubQuery,
+      SubQueryEvent, SubQueryState> {
 
     @Override
     public SubQueryState transition(SubQuery subQuery, SubQueryEvent 
subQueryEvent) {
-      subQuery.startTime = subQuery.clock.getTime();
+      subQuery.startTime = subQuery.queryContext.getClock().getTime();
       subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.queryContext);
       subQuery.taskScheduler.init(subQuery.queryContext.getConf());
       subQuery.taskScheduler.start();
 
+      ExecutionBlock execBlock = subQuery.getBlock();
+
       try {
         // if subquery is dummy, which means it requires only a logical step
         // instead of actual query. An 'union all' is an example of
         // a dummy subquery.
-        if (subQuery.hasUnionPlan()) {
+        if (execBlock.hasUnion()) {
           subQuery.finishUnionUnit();
           subQuery.cleanUp();
-          TableMeta meta = new TableMetaImpl(subQuery.getOutputSchema(),
+          TableMeta meta = new TableMetaImpl(execBlock.getOutputSchema(),
               StoreType.CSV, new Options(), subQuery.getStats());
           subQuery.eventHandler.handle(new 
SubQuerySucceeEvent(subQuery.getId(),
               meta));
@@ -505,35 +352,39 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
         } else {
           QueryUnit [] tasks;
           // TODO - should be improved
-          if (subQuery.isLeafQuery() && subQuery.getScanNodes().length == 1) {
-            SubQuery parent = subQuery.getParentQuery();
+          if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) 
{
+
             // if parent is join, this subquery is for partitioning data.
-            if (parent != null) {
+            if (execBlock.hasParentBlock()) {
               int numTasks = calculatePartitionNum(subQuery);
-              subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, 
numTasks);
+              Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
             }
 
-            tasks = subQuery.getPlanner().createLeafTasks(subQuery);
-          } else if (subQuery.getScanNodes().length > 1) {
-            SubQuery parent = subQuery.getParentQuery();
+            tasks = createLeafTasks(subQuery);
+          } else if (execBlock.getScanNodes().length > 1) {
             // if parent is join, this subquery is for partitioning data.
-            if (parent != null) {
+            if (execBlock.hasParentBlock()) {
               int numTasks = calculatePartitionNum(subQuery);
-              subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, 
numTasks);
+              Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
+            }
+
+            if (subQuery.getId().getId() == 15) {
+              System.out.println("error point!");
             }
+
             tasks = Repartitioner.createJoinTasks(subQuery);
 
           } else {
-            SubQuery parent = subQuery.getParentQuery();
             // if parent is join, this subquery is for partitioning data.
-            if (parent != null) {
+            if (execBlock.hasParentBlock()) {
               int partitionNum = calculatePartitionNum(subQuery);
-              subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, 
partitionNum);
+              Repartitioner.setPartitionNumberForTwoPhase(subQuery, 
partitionNum);
             }
             int numTasks = getNonLeafTaskNum(subQuery);
 
-            tasks = Repartitioner.createNonLeafTask(subQuery,
-                subQuery.getChildIterator().next(), numTasks);
+            SubQueryId childId = 
subQuery.getBlock().getChildBlocks().iterator().next().getId();
+            SubQuery child = subQuery.queryContext.getSubQuery(childId);
+            tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks);
           }
           for (QueryUnit task : tasks) {
             subQuery.addTask(task);
@@ -543,7 +394,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
           // if there is no tasks
           if (subQuery.tasks.size() == 0) {
             subQuery.cleanUp();
-            TableMeta meta = toTableMeta(subQuery.getStoreTableNode());
+            TableMeta meta = toTableMeta(execBlock.getStoreTableNode());
             meta.setStat(subQuery.getStats());
             subQuery.eventHandler.handle(new 
SubQuerySucceeEvent(subQuery.getId(),
                 meta));
@@ -565,10 +416,10 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
             org.apache.hadoop.yarn.api.records.Priority priority =
                 RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
                     org.apache.hadoop.yarn.api.records.Priority.class);
-            priority.setPriority(100 - subQuery.getPriority().get());
+            priority.setPriority(subQuery.getPriority().get());
             ContainerAllocationEvent event =
                 new 
ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
-                    subQuery.getId(), priority, resource, numRequest, 
subQuery.isLeafQuery(), 0.0f);
+                    subQuery.getId(), priority, resource, numRequest, 
execBlock.isLeafBlock(), 0.0f);
             subQuery.eventHandler.handle(event);
           }
         }
@@ -582,6 +433,48 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
         return SubQueryState.FAILED;
       }
     }
+
+    public QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      ScanNode[] scans = execBlock.getScanNodes();
+      Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
+      TableMeta meta;
+      Path inputPath;
+
+      ScanNode scan = scans[0];
+      TableDesc desc = 
subQuery.queryContext.getCatalog().getTableDesc(scan.getTableId());
+      inputPath = desc.getPath();
+      meta = desc.getMeta();
+
+      // TODO - should be change the inner directory
+      Path oldPath = new Path(inputPath, "data");
+      FileSystem fs = inputPath.getFileSystem(subQuery.queryContext.getConf());
+      if (fs.exists(oldPath)) {
+        inputPath = oldPath;
+      }
+      List<Fragment> fragments = 
subQuery.getStorageManager().getSplits(scan.getTableId(), meta, inputPath);
+
+      QueryUnit queryUnit;
+      List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
+
+      int i = 0;
+      for (Fragment fragment : fragments) {
+        queryUnit = newQueryUnit(subQuery, i++);
+        queryUnit.setFragment(scan.getTableId(), fragment);
+        queryUnits.add(queryUnit);
+      }
+
+      return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
+    }
+
+    private QueryUnit newQueryUnit(SubQuery subQuery, int taskId) {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      QueryUnit unit = new QueryUnit(
+          QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), 
execBlock.isLeafBlock(),
+          subQuery.eventHandler);
+      unit.setLogicalPlan(execBlock.getPlan());
+      return unit;
+    }
   }
 
   /**
@@ -593,25 +486,25 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
    */
   public static int calculatePartitionNum(SubQuery subQuery) {
     TajoConf conf = subQuery.queryContext.getConf();
-    SubQuery parent = subQuery.getParentQuery();
+    ExecutionBlock parent = subQuery.getBlock().getParentBlock();
 
     GroupbyNode grpNode = null;
     if (parent != null) {
       grpNode = (GroupbyNode) PlannerUtil.findTopNode(
-          parent.getLogicalPlan(), ExprType.GROUP_BY);
+          parent.getPlan(), ExprType.GROUP_BY);
     }
 
     // Is this subquery the first step of join?
     if (parent != null && parent.getScanNodes().length == 2) {
-      Iterator<SubQuery> child = parent.getChildQueries().iterator();
+      Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
 
       // for inner
-      SubQuery outer = child.next();
-      long outerVolume = getInputVolume(outer);
+      ExecutionBlock outer = child.next();
+      long outerVolume = getInputVolume(subQuery.queryContext, outer);
 
       // for inner
-      SubQuery inner = child.next();
-      long innerVolume = getInputVolume(inner);
+      ExecutionBlock inner = child.next();
+      long innerVolume = getInputVolume(subQuery.queryContext, inner);
       LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
       LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
 
@@ -631,7 +524,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       if (grpNode.getGroupingColumns().length == 0) {
         return 1;
       } else {
-        long volume = getInputVolume(subQuery);
+        long volume = getInputVolume(subQuery.queryContext, subQuery.block);
 
         int mb = (int) Math.ceil((double)volume / 1048576);
         LOG.info("Table's volume is approximately " + mb + " MB");
@@ -643,7 +536,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       }
     } else {
       LOG.info("============>>>>> Unexpected Case! <<<<<================");
-      long volume = getInputVolume(subQuery);
+      long volume = getInputVolume(subQuery.queryContext, subQuery.block);
 
       int mb = (int) Math.ceil((double)volume / 1048576);
       LOG.info("Table's volume is approximately " + mb + " MB");
@@ -654,20 +547,20 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     }
   }
 
-  public static long getInputVolume(SubQuery subQuery) {
-    CatalogService catalog = subQuery.queryContext.getCatalog();
-    if (subQuery.hasChildQuery()) {
-      Iterator<SubQuery> it = subQuery.getChildQueries().iterator();
+  public static long getInputVolume(QueryContext context, ExecutionBlock 
execBlock) {
+    CatalogService catalog = context.getCatalog();
+    if (execBlock.isLeafBlock()) {
+      ScanNode outerScan = execBlock.getScanNodes()[0];
+      TableStat stat = 
catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat();
+      return stat.getNumBytes();
+    } else {
       long aggregatedVolume = 0;
-      while(it.hasNext()) {
-        aggregatedVolume += it.next().getStats().getNumBytes();
+      for (ExecutionBlock childBlock : execBlock.getChildBlocks()) {
+        SubQuery subquery = context.getSubQuery(childBlock.getId());
+        aggregatedVolume += subquery.getStats().getNumBytes();
       }
 
       return aggregatedVolume;
-    } else {
-      ScanNode outerScan = subQuery.getScanNodes()[0];
-      TableStat stat = 
catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat();
-      return stat.getNumBytes();
     }
   }
 
@@ -679,7 +572,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
    */
   public static int getNonLeafTaskNum(SubQuery subQuery) {
     // Getting intermediate data size
-    long volume = getInputVolume(subQuery);
+    long volume = getInputVolume(subQuery.queryContext, subQuery.getBlock());
 
     int mb = (int) Math.ceil((double)volume / 1048576);
     LOG.info("Table's volume is approximately " + mb + " MB");
@@ -766,6 +659,8 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       // TODO - records succeeded, failed, killed completed task
       // TODO - records metrics
 
+      ExecutionBlock execBlock = subQuery.getBlock();
+
       for (Entry<ContainerId, Container> entry : 
subQuery.containers.entrySet()) {
         subQuery.eventHandler.handle(new TaskRunnerStopEvent(subQuery.getId(),
             entry.getValue()));
@@ -773,13 +668,13 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       subQuery.cleanUp();
       subQuery.taskScheduler.stop();
 
-      StoreTableNode storeTableNode = subQuery.getStoreTableNode();
+      StoreTableNode storeTableNode = execBlock.getStoreTableNode();
       TableMeta meta = toTableMeta(storeTableNode);
       meta.setStat(subQuery.getStats());
 
       subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(),
           meta));
-      subQuery.finishTime = subQuery.clock.getTime();
+      subQuery.finishTime = subQuery.queryContext.getClock().getTime();
     }
   }
 
@@ -809,15 +704,16 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
 
   private void writeStat(SubQuery subQuery, TableStat stat)
       throws IOException {
+    ExecutionBlock execBlock = subQuery.getBlock();
 
-    if (subQuery.getLogicalPlan().getType() == ExprType.CREATE_INDEX) {
-      IndexWriteNode index = (IndexWriteNode) subQuery.getLogicalPlan();
+    if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) {
+      IndexWriteNode index = (IndexWriteNode) execBlock.getPlan();
       Path indexPath = new Path(sm.getTablePath(index.getTableName()), 
"index");
       TableMeta meta;
       if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) {
         meta = sm.getTableMeta(indexPath);
       } else {
-        StoreTableNode storeTableNode = subQuery.getStoreTableNode();
+        StoreTableNode storeTableNode = execBlock.getStoreTableNode();
         meta = toTableMeta(storeTableNode);
       }
       String indexName = IndexUtil.getIndexName(index.getTableName(),
@@ -828,10 +724,10 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       sm.writeTableMeta(indexPath, meta);
 
     } else {
-      StoreTableNode storeTableNode = subQuery.getStoreTableNode();
+      StoreTableNode storeTableNode = execBlock.getStoreTableNode();
       TableMeta meta = toTableMeta(storeTableNode);
       meta.setStat(stat);
-      sm.writeTableMeta(sm.getTablePath(subQuery.getOutputName()), meta);
+      sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
     }
   }
 
@@ -845,19 +741,6 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     }
   }
 
-  private void finalizePrevSubQuery(SubQuery subQuery)
-      throws Exception {
-    SubQuery prevSubQuery;
-    for (ScanNode scan : subQuery.getScanNodes()) {
-      prevSubQuery = subQuery.getChildQuery(scan);
-      if (prevSubQuery.getStoreTableNode().getSubNode().getType() != 
ExprType.UNION) {
-        for (QueryUnit unit : prevSubQuery.getQueryUnits()) {
-          //sendCommand(unit.getLastAttempt(), CommandType.FINALIZE);
-        }
-      }
-    }
-  }
-
   @Override
   public void handle(SubQueryEvent event) {
     //if (LOG.isDebugEnabled()) {
@@ -871,14 +754,14 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
         getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
         LOG.error("Can't handle this event at current state", e);
-        eventHandler.handle(new SubQueryEvent(this.id,
+        eventHandler.handle(new SubQueryEvent(getId(),
             SubQueryEventType.SQ_INTERNAL_ERROR));
       }
 
       //notify the eventhandler of state change
       if (LOG.isDebugEnabled()) {
         if (oldState != getState()) {
-          LOG.debug(id + " SubQuery Transitioned from " + oldState + " to "
+          LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to 
"
               + getState());
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java
 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java
deleted file mode 100644
index 8756dc4..0000000
--- 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.service.AbstractService;
-
-public class TaskContainerManager extends AbstractService {
-
-  public TaskContainerManager() {
-    super(TaskContainerManager.class.getName());
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
index 3f92608..816e285 100644
--- 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
+++ 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
@@ -389,8 +389,7 @@ public class TaskSchedulerImpl extends AbstractService
           LOG.debug("Assigned based on * match");
 
           QueryUnit task;
-          task = context.getQuery()
-              
.getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId());
+          task = 
context.getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId());
           QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
               attemptId,
               Lists.newArrayList(task.getAllFragments()),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
index 93b786d..43fc1d0 100644
--- 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
+++ 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
@@ -126,7 +126,7 @@ public class WorkerListener extends AbstractService
                    QueryUnitAttemptIdProto attemptIdProto,
                    RpcCallback<BoolProto> done) {
     QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
-    
context.getQuery(attemptId.getQueryId()).getContext().getQuery().getSubQuery(attemptId.getSubQueryId()).
+    
context.getQuery(attemptId.getQueryId()).getContext().getSubQuery(attemptId.getSubQueryId()).
         getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
         resetExpireTime();
     done.run(TRUE_PROTO);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
index 28ccbfe..d0fae9f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
@@ -47,7 +47,7 @@ import tajo.engine.planner.logical.StoreTableNode;
 import tajo.engine.planner.physical.PhysicalExec;
 import tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
 import tajo.ipc.protocolrecords.QueryUnitRequest;
-import tajo.master.SubQuery.PARTITION_TYPE;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.rpc.NullCallback;
 import tajo.storage.Fragment;
 import tajo.storage.StorageUtil;
@@ -101,7 +101,7 @@ public class Task {
   private AtomicBoolean progressFlag = new AtomicBoolean(false);
 
   // TODO - to be refactored
-  private PARTITION_TYPE partitionType = null;
+  private PartitionType partitionType = null;
   private Schema finalSchema = null;
   private TupleComparator sortComp = null;
 
@@ -153,7 +153,7 @@ public class Task {
       context.setInterQuery();
       StoreTableNode store = (StoreTableNode) plan;
       this.partitionType = store.getPartitionType();
-      if (partitionType == PARTITION_TYPE.RANGE) {
+      if (partitionType == PartitionType.RANGE) {
         SortNode sortNode = (SortNode) store.getSubNode();
         this.finalSchema = 
PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
         this.sortComp = new TupleComparator(finalSchema, 
sortNode.getSortKeys());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
 
b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index 2af74bb..be8a7f1 100644
--- 
a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ 
b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -46,9 +46,9 @@ import tajo.engine.planner.LogicalPlanner;
 import tajo.engine.planner.PlanningContext;
 import tajo.engine.planner.global.MasterPlan;
 import tajo.engine.planner.logical.*;
+import tajo.master.ExecutionBlock;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.master.GlobalPlanner;
-import tajo.master.SubQuery;
-import tajo.master.SubQuery.PARTITION_TYPE;
 import tajo.master.TajoMaster;
 import tajo.storage.*;
 
@@ -159,11 +159,11 @@ public class TestGlobalQueryPlanner {
     MasterPlan globalPlan = planner.build(queryId,
         (LogicalRootNode) plan1);
 
-    SubQuery unit = globalPlan.getRoot();
-    assertFalse(unit.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, unit.getOutputType());
+    ExecutionBlock unit = globalPlan.getRoot();
+    assertFalse(unit.hasChildBlock());
+    assertEquals(PartitionType.LIST, unit.getPartitionType());
 
-    LogicalNode plan2 = unit.getLogicalPlan();
+    LogicalNode plan2 = unit.getPlan();
     assertEquals(ExprType.STORE, plan2.getType());
     assertEquals(ExprType.SCAN, 
((StoreTableNode)plan2).getSubNode().getType());
   }
@@ -178,20 +178,20 @@ public class TestGlobalQueryPlanner {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery next, prev;
+    ExecutionBlock next, prev;
     
     next = globalPlan.getRoot();
-    assertTrue(next.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, next.getOutputType());
+    assertTrue(next.hasChildBlock());
+    assertEquals(PartitionType.LIST, next.getPartitionType());
     for (ScanNode scan : next.getScanNodes()) {
       assertTrue(scan.isLocal());
     }
     assertFalse(next.getStoreTableNode().isLocal());
-    Iterator<SubQuery> it= next.getChildIterator();
+    Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
     
     prev = it.next();
-    assertFalse(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.HASH, prev.getOutputType());
+    assertFalse(prev.hasChildBlock());
+    assertEquals(PartitionType.HASH, prev.getPartitionType());
     assertTrue(prev.getStoreTableNode().isLocal());
     assertFalse(it.hasNext());
     
@@ -216,26 +216,26 @@ public class TestGlobalQueryPlanner {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery next, prev;
+    ExecutionBlock next, prev;
     
     next = globalPlan.getRoot();
     assertEquals(ExprType.PROJECTION,
         next.getStoreTableNode().getSubNode().getType());
-    assertTrue(next.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, next.getOutputType());
-    Iterator<SubQuery> it= next.getChildIterator();
+    assertTrue(next.hasChildBlock());
+    assertEquals(PartitionType.LIST, next.getPartitionType());
+    Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
 
     prev = it.next();
     assertEquals(ExprType.SORT,
         prev.getStoreTableNode().getSubNode().getType());
-    assertTrue(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, prev.getOutputType());
-    it= prev.getChildIterator();
+    assertTrue(prev.hasChildBlock());
+    assertEquals(PartitionType.LIST, prev.getPartitionType());
+    it= prev.getChildBlocks().iterator();
     next = prev;
     
     prev = it.next();
-    assertFalse(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.RANGE, prev.getOutputType());
+    assertFalse(prev.hasChildBlock());
+    assertEquals(PartitionType.RANGE, prev.getPartitionType());
     assertFalse(it.hasNext());
     
     ScanNode []scans = prev.getScanNodes();
@@ -259,59 +259,59 @@ public class TestGlobalQueryPlanner {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery next, prev;
+    ExecutionBlock next, prev;
     
     // the second phase of the sort
     next = globalPlan.getRoot();
-    assertTrue(next.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, next.getOutputType());
+    assertTrue(next.hasChildBlock());
+    assertEquals(PartitionType.LIST, next.getPartitionType());
     assertEquals(ExprType.PROJECTION, 
next.getStoreTableNode().getSubNode().getType());
     ScanNode []scans = next.getScanNodes();
     assertEquals(1, scans.length);
-    Iterator<SubQuery> it= next.getChildIterator();
+    Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
 
     prev = it.next();
     assertEquals(ExprType.SORT, 
prev.getStoreTableNode().getSubNode().getType());
-    assertEquals(PARTITION_TYPE.LIST, prev.getOutputType());
+    assertEquals(PartitionType.LIST, prev.getPartitionType());
     scans = prev.getScanNodes();
     assertEquals(1, scans.length);
-    it= prev.getChildIterator();
+    it= prev.getChildBlocks().iterator();
     
     // the first phase of the sort
     prev = it.next();
     assertEquals(ExprType.SORT, 
prev.getStoreTableNode().getSubNode().getType());
     assertEquals(scans[0].getInSchema(), prev.getOutputSchema());
-    assertTrue(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.RANGE, prev.getOutputType());
+    assertTrue(prev.hasChildBlock());
+    assertEquals(PartitionType.RANGE, prev.getPartitionType());
     assertFalse(it.hasNext());
     scans = prev.getScanNodes();
     assertEquals(1, scans.length);
     next = prev;
-    it= next.getChildIterator();
+    it= next.getChildBlocks().iterator();
     
     // the second phase of the join
     prev = it.next();
     assertEquals(ExprType.JOIN, 
prev.getStoreTableNode().getSubNode().getType());
     assertEquals(scans[0].getInSchema(), prev.getOutputSchema());
-    assertTrue(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, prev.getOutputType());
+    assertTrue(prev.hasChildBlock());
+    assertEquals(PartitionType.LIST, prev.getPartitionType());
     assertFalse(it.hasNext());
     scans = prev.getScanNodes();
     assertEquals(2, scans.length);
     next = prev;
-    it= next.getChildIterator();
+    it= next.getChildBlocks().iterator();
     
     // the first phase of the join
     prev = it.next();
     assertEquals(ExprType.SCAN, 
prev.getStoreTableNode().getSubNode().getType());
-    assertFalse(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.HASH, prev.getOutputType());
+    assertFalse(prev.hasChildBlock());
+    assertEquals(PartitionType.HASH, prev.getPartitionType());
     assertEquals(1, prev.getScanNodes().length);
     
     prev = it.next();
     assertEquals(ExprType.SCAN, 
prev.getStoreTableNode().getSubNode().getType());
-    assertFalse(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.HASH, prev.getOutputType());
+    assertFalse(prev.hasChildBlock());
+    assertEquals(PartitionType.HASH, prev.getPartitionType());
     assertEquals(1, prev.getScanNodes().length);
     assertFalse(it.hasNext());
   }
@@ -325,15 +325,15 @@ public class TestGlobalQueryPlanner {
     
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery unit = globalPlan.getRoot();
+    ExecutionBlock unit = globalPlan.getRoot();
     StoreTableNode store = unit.getStoreTableNode();
     assertEquals(ExprType.JOIN, store.getSubNode().getType());
-    assertTrue(unit.hasChildQuery());
+    assertTrue(unit.hasChildBlock());
     ScanNode [] scans = unit.getScanNodes();
     assertEquals(2, scans.length);
-    SubQuery prev;
+    ExecutionBlock prev;
     for (ScanNode scan : scans) {
-      prev = unit.getChildQuery(scan);
+      prev = unit.getChildBlock(scan);
       store = prev.getStoreTableNode();
       assertEquals(ExprType.SCAN, store.getSubNode().getType());
     }
@@ -348,14 +348,14 @@ public class TestGlobalQueryPlanner {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery unit = globalPlan.getRoot();
+    ExecutionBlock unit = globalPlan.getRoot();
     StoreTableNode store = unit.getStoreTableNode();
     assertEquals(ExprType.PROJECTION, store.getSubNode().getType());
 
     ScanNode[] scans = unit.getScanNodes();
     assertEquals(1, scans.length);
 
-    unit = unit.getChildQuery(scans[0]);
+    unit = unit.getChildBlock(scans[0]);
     store = unit.getStoreTableNode();
     assertEquals(ExprType.UNION, store.getSubNode().getType());
     UnionNode union = (UnionNode) store.getSubNode();
@@ -367,11 +367,11 @@ public class TestGlobalQueryPlanner {
     union = (UnionNode) union.getInnerNode();
     assertEquals(ExprType.SCAN, union.getOuterNode().getType());
     assertEquals(ExprType.SCAN, union.getInnerNode().getType());
-    assertTrue(unit.hasChildQuery());
+    assertTrue(unit.hasChildBlock());
     
     String tableId = "";
     for (ScanNode scan : unit.getScanNodes()) {
-      SubQuery prev = unit.getChildQuery(scan);
+      ExecutionBlock prev = unit.getChildBlock(scan);
       store = prev.getStoreTableNode();
       assertEquals(ExprType.GROUP_BY, store.getSubNode().getType());
       GroupbyNode groupby = (GroupbyNode) store.getSubNode();
@@ -382,7 +382,7 @@ public class TestGlobalQueryPlanner {
         assertEquals(tableId, store.getTableName());
       }
       assertEquals(1, prev.getScanNodes().length);
-      prev = prev.getChildQuery(prev.getScanNodes()[0]);
+      prev = prev.getChildBlock(prev.getScanNodes()[0]);
       store = prev.getStoreTableNode();
       assertEquals(ExprType.GROUP_BY, store.getSubNode().getType());
       groupby = (GroupbyNode) store.getSubNode();
@@ -486,13 +486,13 @@ public class TestGlobalQueryPlanner {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery second, first, mid;
+    ExecutionBlock second, first, mid;
     ScanNode secondScan, firstScan, midScan;
 
     second = globalPlan.getRoot();
     assertTrue(second.getScanNodes().length == 1);
 
-    first = second.getChildQuery(second.getScanNodes()[0]);
+    first = second.getChildBlock(second.getScanNodes()[0]);
 
     GroupbyNode firstGroupby, secondGroupby, midGroupby;
     secondGroupby = (GroupbyNode) second.getStoreTableNode().getSubNode();
@@ -510,10 +510,10 @@ public class TestGlobalQueryPlanner {
     midScan = mid.getScanNodes()[0];
     firstScan = first.getScanNodes()[0];
 
-    assertTrue(first.getParentQuery().equals(mid));
-    assertTrue(mid.getParentQuery().equals(second));
-    assertTrue(second.getChildQuery(secondScan).equals(mid));
-    assertTrue(mid.getChildQuery(midScan).equals(first));
+    assertTrue(first.getParentBlock().equals(mid));
+    assertTrue(mid.getParentBlock().equals(second));
+    assertTrue(second.getChildBlock(secondScan).equals(mid));
+    assertTrue(mid.getChildBlock(midScan).equals(first));
     assertEquals(first.getOutputName(), midScan.getTableId());
     assertEquals(first.getOutputSchema(), midScan.getInSchema());
     assertEquals(mid.getOutputName(), secondScan.getTableId());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
 
b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
index fae28f7..c0d0dd2 100644
--- 
a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ 
b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -43,8 +43,8 @@ import tajo.engine.planner.LogicalOptimizer;
 import tajo.engine.planner.LogicalPlanner;
 import tajo.engine.planner.PlanningContext;
 import tajo.engine.planner.logical.*;
+import tajo.master.ExecutionBlock;
 import tajo.master.GlobalPlanner;
-import tajo.master.SubQuery;
 import tajo.storage.*;
 
 import java.io.IOException;
@@ -144,7 +144,7 @@ public class TestGlobalQueryOptimizer {
         (LogicalRootNode) plan);
     globalPlan = optimizer.optimize(globalPlan);
     
-    SubQuery unit = globalPlan.getRoot();
+    ExecutionBlock unit = globalPlan.getRoot();
     StoreTableNode store = unit.getStoreTableNode();
     assertEquals(ExprType.PROJECTION, store.getSubNode().getType());
     ProjectionNode proj = (ProjectionNode) store.getSubNode();
@@ -153,16 +153,16 @@ public class TestGlobalQueryOptimizer {
     assertEquals(ExprType.SCAN, sort.getSubNode().getType());
     ScanNode scan = (ScanNode) sort.getSubNode();
     
-    assertTrue(unit.hasChildQuery());
-    unit = unit.getChildQuery(scan);
+    assertTrue(unit.hasChildBlock());
+    unit = unit.getChildBlock(scan);
     store = unit.getStoreTableNode();
     assertEquals(ExprType.SORT, store.getSubNode().getType());
     sort = (SortNode) store.getSubNode();
     assertEquals(ExprType.JOIN, sort.getSubNode().getType());
     
-    assertTrue(unit.hasChildQuery());
+    assertTrue(unit.hasChildBlock());
     for (ScanNode prevscan : unit.getScanNodes()) {
-      SubQuery prev = unit.getChildQuery(prevscan);
+      ExecutionBlock prev = unit.getChildBlock(prevscan);
       store = prev.getStoreTableNode();
       assertEquals(ExprType.SCAN, store.getSubNode().getType());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
 
b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 6b8496a..8b5dee5 100644
--- 
a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ 
b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -46,7 +46,7 @@ import tajo.engine.planner.logical.LogicalNode;
 import tajo.engine.planner.logical.LogicalRootNode;
 import tajo.engine.planner.logical.StoreTableNode;
 import tajo.engine.planner.logical.UnionNode;
-import tajo.master.SubQuery;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.master.TajoMaster;
 import tajo.storage.*;
 import tajo.storage.index.bst.BSTIndex;
@@ -424,7 +424,7 @@ public class TestPhysicalPlanner {
     Column key1 = new Column("score.deptName", DataType.STRING);
     Column key2 = new Column("score.class", DataType.STRING);
     StoreTableNode storeNode = new StoreTableNode("partition");
-    storeNode.setPartitions(SubQuery.PARTITION_TYPE.HASH, new Column[]{key1, 
key2}, numPartitions);
+    storeNode.setPartitions(PartitionType.HASH, new Column[]{key1, key2}, 
numPartitions);
     PlannerUtil.insertNode(plan, storeNode);
     plan = LogicalOptimizer.optimize(context, plan);
 
@@ -482,7 +482,7 @@ public class TestPhysicalPlanner {
 
     int numPartitions = 1;
     StoreTableNode storeNode = new StoreTableNode("emptyset");
-    storeNode.setPartitions(SubQuery.PARTITION_TYPE.HASH, new Column[] {}, 
numPartitions);
+    storeNode.setPartitions(PartitionType.HASH, new Column[] {}, 
numPartitions);
     PlannerUtil.insertNode(plan, storeNode);
     plan = LogicalOptimizer.optimize(context, plan);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java
 
b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java
new file mode 100644
index 0000000..c6a5d43
--- /dev/null
+++ 
b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tajo.master;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import tajo.QueryIdFactory;
+import tajo.TajoTestingCluster;
+import tajo.benchmark.TPCH;
+import tajo.catalog.CatalogService;
+import tajo.catalog.TCatUtil;
+import tajo.catalog.TableDesc;
+import tajo.catalog.TableMeta;
+import tajo.catalog.proto.CatalogProtos;
+import tajo.conf.TajoConf;
+import tajo.engine.parser.QueryAnalyzer;
+import tajo.engine.planner.LogicalPlanner;
+import tajo.engine.planner.PlanningContext;
+import tajo.engine.planner.global.MasterPlan;
+import tajo.engine.planner.logical.LogicalNode;
+import tajo.engine.planner.logical.LogicalRootNode;
+import tajo.storage.StorageManager;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExecutionBlockCursor {
+  private static TajoTestingCluster util;
+  private static TajoConf conf;
+  private static CatalogService catalog;
+  private static GlobalPlanner planner;
+  private static QueryAnalyzer analyzer;
+  private static LogicalPlanner logicalPlanner;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.startCatalogCluster();
+
+    conf = util.getConfiguration();
+    catalog = util.getMiniCatalogCluster().getCatalog();
+    TPCH tpch = new TPCH();
+    tpch.loadSchemas();
+    tpch.loadOutSchema();
+    for (String table : tpch.getTableNames()) {
+      TableMeta m = TCatUtil.newTableMeta(tpch.getSchema(table), 
CatalogProtos.StoreType.CSV);
+      TableDesc d = TCatUtil.newTableDesc(table, m, new Path("file:///"));
+      catalog.addTable(d);
+    }
+
+    analyzer = new QueryAnalyzer(catalog);
+    logicalPlanner = new LogicalPlanner(catalog);
+
+    StorageManager sm  = new StorageManager(conf);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    planner = new GlobalPlanner(conf, catalog, sm, 
dispatcher.getEventHandler());
+  }
+
+  public static void tearDown() {
+    util.shutdownCatalogCluster();
+  }
+
+  @Test
+  public void testNextBlock() throws Exception {
+    PlanningContext context = analyzer.parse(
+        "select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, 
s_phone, s_comment, ps_supplycost, " +
+            "r_name, p_type, p_size " +
+            "from region join nation on n_regionkey = r_regionkey and r_name = 
'AMERICA' " +
+            "join supplier on s_nationkey = n_nationkey " +
+            "join partsupp on s_suppkey = ps_suppkey " +
+            "join part on p_partkey = ps_partkey and p_type like '%BRASS' and 
p_size = 15");
+    LogicalNode logicalPlan = logicalPlanner.createPlan(context);
+    MasterPlan plan = planner.build(QueryIdFactory.newQueryId(), 
(LogicalRootNode) logicalPlan);
+
+    ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
+
+    int count = 0;
+    while(cursor.hasNext()) {
+      cursor.nextBlock();
+      count++;
+    }
+
+    // 4 input relations, 4 join, and 1 projection = 10 execution blocks
+    assertEquals(10, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java 
b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
index ca3e870..2de54b3 100644
--- 
a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
+++ 
b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
@@ -23,6 +23,7 @@ import org.junit.Test;
 import tajo.QueryId;
 import tajo.SubQueryId;
 import tajo.TestQueryUnitId;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.util.TUtil;
 import tajo.util.TajoIdUtils;
 
@@ -47,7 +48,7 @@ public class TestRepartitioner {
 
     Collection<URI> uris = Repartitioner.
         createHashFetchURL(hostName + ":" + port, sid, partitionId,
-            SubQuery.PARTITION_TYPE.HASH, intermediateEntries);
+            PartitionType.HASH, intermediateEntries);
 
     List<String> taList = TUtil.newList();
     for (URI uri : uris) {

Reply via email to