Updated Branches: refs/heads/master 6cf96bd4e -> fef3dd509
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java deleted file mode 100644 index 4abeefb..0000000 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package tajo.master; - -import com.google.common.collect.Lists; - -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Map; - -public class SchedulerUtils { - - public static class MapComparatorBySize implements Comparator<Map>{ - - @Override - public int compare(Map m1, Map m2) { - return m1.size() - m2.size(); - } - } - - public static List<Map> sortListOfMapsBySize( - final List<Map> maplist) { - Map[] arr = new Map[maplist.size()]; - arr = maplist.toArray(arr); - Arrays.sort(arr, new MapComparatorBySize()); - - List<Map> newlist = Lists.newArrayList(arr); - return newlist; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java index 754f267..ff73334 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java @@ -22,14 +22,15 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.state.*; +import tajo.QueryIdFactory; import tajo.QueryUnitId; import tajo.SubQueryId; import tajo.catalog.*; @@ -43,6 +44,7 @@ import tajo.engine.planner.PlannerUtil; import tajo.engine.planner.logical.*; import tajo.master.QueryMaster.QueryContext; import tajo.master.event.*; +import tajo.storage.Fragment; import tajo.storage.StorageManager; import tajo.util.IndexUtil; @@ -57,38 +59,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static tajo.conf.TajoConf.ConfVars; +/** + * SubQuery is an instance of an ExecutionBlock. + */ public class SubQuery implements EventHandler<SubQueryEvent> { private static final Log LOG = LogFactory.getLog(SubQuery.class); - public enum PARTITION_TYPE { - /** for hash partitioning */ - HASH, - LIST, - /** for map-side join */ - BROADCAST, - /** for range partitioning */ - RANGE - } - - private SubQueryId id; - private LogicalNode plan = null; - private StoreTableNode store = null; - private List<ScanNode> scanlist = null; - private SubQuery next; - private Map<ScanNode, SubQuery> childSubQueries; - private PARTITION_TYPE outputType; - private boolean hasJoinPlan; - private boolean hasUnionPlan; + private ExecutionBlock block; private Priority priority; private TableStat stats; EventHandler eventHandler; final StorageManager sm; - private final GlobalPlanner planner; - private boolean isLeafQuery = false; TaskSchedulerImpl taskScheduler; QueryContext queryContext; - private Clock clock; private long startTime; private long finishTime; @@ -106,7 +90,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { new StateMachineFactory <SubQuery, SubQueryState, SubQueryEventType, SubQueryEvent> (SubQueryState.NEW) - .addTransition(SubQueryState.NEW, EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, SubQueryState.SUCCEEDED), + .addTransition(SubQueryState.NEW, + EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, SubQueryState.SUCCEEDED), SubQueryEventType.SQ_INIT, new InitAndRequestContainer()) .addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED, @@ -148,20 +133,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private int completedTaskCount = 0; - public SubQuery(SubQueryId id, StorageManager sm, GlobalPlanner planner) { - this.id = id; - childSubQueries = new HashMap<ScanNode, SubQuery>(); - scanlist = new ArrayList<ScanNode>(); - hasJoinPlan = false; - hasUnionPlan = false; + public SubQuery(QueryContext context, ExecutionBlock block, StorageManager sm) { + this.queryContext = context; + this.block = block; this.sm = sm; - this.planner = planner; + this.eventHandler = context.getEventHandler(); ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); - - stateMachine = stateMachineFactory.make(this); } @@ -190,72 +170,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } - public void setQueryContext(QueryContext context) { - this.queryContext = context; - } - - public void setClock(Clock clock) { - this.clock = clock; - } - - public void setEventHandler(EventHandler eventHandler) { - this.eventHandler = eventHandler; - } - - public boolean isLeafQuery() { - return this.isLeafQuery; - } - - public void setLeafQuery() { - this.isLeafQuery = true; + public ExecutionBlock getBlock() { + return block; } public void addTask(QueryUnit task) { tasks.put(task.getId(), task); } - - public void setOutputType(PARTITION_TYPE type) { - this.outputType = type; - } - - public GlobalPlanner getPlanner() { - return planner; - } - - public void setLogicalPlan(LogicalNode plan) { - hasJoinPlan = false; - Preconditions.checkArgument(plan.getType() == ExprType.STORE - || plan.getType() == ExprType.CREATE_INDEX); - - this.plan = plan; - if (plan instanceof StoreTableNode) { - store = (StoreTableNode) plan; - } else { - store = (StoreTableNode) ((IndexWriteNode)plan).getSubNode(); - } - - LogicalNode node = plan; - ArrayList<LogicalNode> s = new ArrayList<LogicalNode>(); - s.add(node); - while (!s.isEmpty()) { - node = s.remove(s.size()-1); - if (node instanceof UnaryNode) { - UnaryNode unary = (UnaryNode) node; - s.add(s.size(), unary.getSubNode()); - } else if (node instanceof BinaryNode) { - BinaryNode binary = (BinaryNode) node; - if (binary.getType() == ExprType.JOIN) { - hasJoinPlan = true; - } else if (binary.getType() == ExprType.UNION) { - hasUnionPlan = true; - } - s.add(s.size(), binary.getOuterNode()); - s.add(s.size(), binary.getInnerNode()); - } else if (node instanceof ScanNode) { - scanlist.add((ScanNode)node); - } - } - } public void abortSubQuery(SubQueryState finalState) { // TODO - @@ -271,37 +192,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> { return this.stateMachine; } - public boolean hasJoinPlan() { - return this.hasJoinPlan; - } - - public boolean hasUnionPlan() { - return this.hasUnionPlan; - } - - public void setParentQuery(SubQuery next) { - this.next = next; - } - - public void addChildQuery(ScanNode prevscan, SubQuery prev) { - childSubQueries.put(prevscan, prev); - } - - public void addChildQueries(Map<ScanNode, SubQuery> prevs) { - this.childSubQueries.putAll(prevs); - } - - public void setQueryUnits(List<QueryUnit> queryUnits) { - for (QueryUnit task: queryUnits) { - tasks.put(task.getId(), task); - } - } - - public void removeChildQuery(ScanNode scan) { - scanlist.remove(scan); - this.childSubQueries.remove(scan); - } - public void setPriority(int priority) { if (this.priority == null) { this.priority = new Priority(priority); @@ -316,56 +206,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> { this.stats = stat; } - public SubQuery getParentQuery() { - return this.next; - } - - public boolean hasChildQuery() { - return !this.childSubQueries.isEmpty(); - } - - public Iterator<SubQuery> getChildIterator() { - return this.childSubQueries.values().iterator(); - } - - public Collection<SubQuery> getChildQueries() { - return this.childSubQueries.values(); - } - - public Map<ScanNode, SubQuery> getChildMaps() { - return this.childSubQueries; - } - public SubQuery getChildQuery(ScanNode scanForChild) { - return this.childSubQueries.get(scanForChild); - } - - public String getOutputName() { - return this.store.getTableName(); - } - - public PARTITION_TYPE getOutputType() { - return this.outputType; - } - - public Schema getOutputSchema() { - return this.store.getOutSchema(); - } - - public StoreTableNode getStoreTableNode() { - return this.store; - } - - public ScanNode[] getScanNodes() { - return this.scanlist.toArray(new ScanNode[scanlist.size()]); - } - - public LogicalNode getLogicalPlan() { - return this.plan; + return queryContext.getSubQuery(block.getChildBlock(scanForChild).getId()); } public SubQueryId getId() { - return this.id; + return block.getId(); } public QueryUnit[] getQueryUnits() { @@ -387,13 +233,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(this.id); -/* sb.append(" plan: " + plan.toString()); - sb.append("next: " + next + " childSubQueries:"); - Iterator<SubQuery> it = getChildIterator(); - while (it.hasNext()) { - sb.append(" " + it.next()); - }*/ + sb.append(this.getId()); return sb.toString(); } @@ -401,18 +241,18 @@ public class SubQuery implements EventHandler<SubQueryEvent> { public boolean equals(Object o) { if (o instanceof SubQuery) { SubQuery other = (SubQuery)o; - return this.id.equals(other.getId()); + return getId().equals(other.getId()); } return false; } @Override public int hashCode() { - return this.id.hashCode(); + return getId().hashCode(); } public int compareTo(SubQuery other) { - return this.id.compareTo(other.id); + return getId().compareTo(other.getId()); } public SubQueryState getState() { @@ -439,8 +279,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> { int numBlocks = 0, numPartitions = 0; List<ColumnStat> columnStats = Lists.newArrayList(); - for (SubQuery child : unit.getChildQueries()) { - childStat = child.getStats(); + Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator(); + while (it.hasNext()) { + ExecutionBlock block = it.next(); + SubQuery childSubQuery = unit.queryContext.getSubQuery(block.getId()); + childStat = childSubQuery.getStats(); avgRows += childStat.getAvgRows(); columnStats.addAll(childStat.getColumnStats()); numBlocks += childStat.getNumBlocks(); @@ -448,6 +291,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { numPartitions += childStat.getNumPartitions(); numRows += childStat.getNumRows(); } + stat.setColumnStats(columnStats); stat.setNumBlocks(numBlocks); stat.setNumBytes(numBytes); @@ -458,7 +302,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } public void cleanUp() { - if (hasUnionPlan()) { + if (block.hasUnion()) { try { // write meta and continue TableStat stat = generateUnionStat(this); @@ -469,35 +313,38 @@ public class SubQuery implements EventHandler<SubQueryEvent> { e.printStackTrace(); } } else { + LOG.info("SubQuery: " + getId() + " sets TableStat"); TableStat stat = generateStat(); - setStats(stat); try { writeStat(this, stat); } catch (IOException e) { } } - finishTime = clock.getTime(); + finishTime = queryContext.getClock().getTime(); } - private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> { + private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery, + SubQueryEvent, SubQueryState> { @Override public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { - subQuery.startTime = subQuery.clock.getTime(); + subQuery.startTime = subQuery.queryContext.getClock().getTime(); subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.queryContext); subQuery.taskScheduler.init(subQuery.queryContext.getConf()); subQuery.taskScheduler.start(); + ExecutionBlock execBlock = subQuery.getBlock(); + try { // if subquery is dummy, which means it requires only a logical step // instead of actual query. An 'union all' is an example of // a dummy subquery. - if (subQuery.hasUnionPlan()) { + if (execBlock.hasUnion()) { subQuery.finishUnionUnit(); subQuery.cleanUp(); - TableMeta meta = new TableMetaImpl(subQuery.getOutputSchema(), + TableMeta meta = new TableMetaImpl(execBlock.getOutputSchema(), StoreType.CSV, new Options(), subQuery.getStats()); subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(), meta)); @@ -505,35 +352,39 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } else { QueryUnit [] tasks; // TODO - should be improved - if (subQuery.isLeafQuery() && subQuery.getScanNodes().length == 1) { - SubQuery parent = subQuery.getParentQuery(); + if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) { + // if parent is join, this subquery is for partitioning data. - if (parent != null) { + if (execBlock.hasParentBlock()) { int numTasks = calculatePartitionNum(subQuery); - subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, numTasks); + Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks); } - tasks = subQuery.getPlanner().createLeafTasks(subQuery); - } else if (subQuery.getScanNodes().length > 1) { - SubQuery parent = subQuery.getParentQuery(); + tasks = createLeafTasks(subQuery); + } else if (execBlock.getScanNodes().length > 1) { // if parent is join, this subquery is for partitioning data. - if (parent != null) { + if (execBlock.hasParentBlock()) { int numTasks = calculatePartitionNum(subQuery); - subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, numTasks); + Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks); + } + + if (subQuery.getId().getId() == 15) { + System.out.println("error point!"); } + tasks = Repartitioner.createJoinTasks(subQuery); } else { - SubQuery parent = subQuery.getParentQuery(); // if parent is join, this subquery is for partitioning data. - if (parent != null) { + if (execBlock.hasParentBlock()) { int partitionNum = calculatePartitionNum(subQuery); - subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, partitionNum); + Repartitioner.setPartitionNumberForTwoPhase(subQuery, partitionNum); } int numTasks = getNonLeafTaskNum(subQuery); - tasks = Repartitioner.createNonLeafTask(subQuery, - subQuery.getChildIterator().next(), numTasks); + SubQueryId childId = subQuery.getBlock().getChildBlocks().iterator().next().getId(); + SubQuery child = subQuery.queryContext.getSubQuery(childId); + tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks); } for (QueryUnit task : tasks) { subQuery.addTask(task); @@ -543,7 +394,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { // if there is no tasks if (subQuery.tasks.size() == 0) { subQuery.cleanUp(); - TableMeta meta = toTableMeta(subQuery.getStoreTableNode()); + TableMeta meta = toTableMeta(execBlock.getStoreTableNode()); meta.setStat(subQuery.getStats()); subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(), meta)); @@ -565,10 +416,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> { org.apache.hadoop.yarn.api.records.Priority priority = RecordFactoryProvider.getRecordFactory(null).newRecordInstance( org.apache.hadoop.yarn.api.records.Priority.class); - priority.setPriority(100 - subQuery.getPriority().get()); + priority.setPriority(subQuery.getPriority().get()); ContainerAllocationEvent event = new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ, - subQuery.getId(), priority, resource, numRequest, subQuery.isLeafQuery(), 0.0f); + subQuery.getId(), priority, resource, numRequest, execBlock.isLeafBlock(), 0.0f); subQuery.eventHandler.handle(event); } } @@ -582,6 +433,48 @@ public class SubQuery implements EventHandler<SubQueryEvent> { return SubQueryState.FAILED; } } + + public QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException { + ExecutionBlock execBlock = subQuery.getBlock(); + ScanNode[] scans = execBlock.getScanNodes(); + Preconditions.checkArgument(scans.length == 1, "Must be Scan Query"); + TableMeta meta; + Path inputPath; + + ScanNode scan = scans[0]; + TableDesc desc = subQuery.queryContext.getCatalog().getTableDesc(scan.getTableId()); + inputPath = desc.getPath(); + meta = desc.getMeta(); + + // TODO - should be change the inner directory + Path oldPath = new Path(inputPath, "data"); + FileSystem fs = inputPath.getFileSystem(subQuery.queryContext.getConf()); + if (fs.exists(oldPath)) { + inputPath = oldPath; + } + List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getTableId(), meta, inputPath); + + QueryUnit queryUnit; + List<QueryUnit> queryUnits = new ArrayList<QueryUnit>(); + + int i = 0; + for (Fragment fragment : fragments) { + queryUnit = newQueryUnit(subQuery, i++); + queryUnit.setFragment(scan.getTableId(), fragment); + queryUnits.add(queryUnit); + } + + return queryUnits.toArray(new QueryUnit[queryUnits.size()]); + } + + private QueryUnit newQueryUnit(SubQuery subQuery, int taskId) { + ExecutionBlock execBlock = subQuery.getBlock(); + QueryUnit unit = new QueryUnit( + QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(), + subQuery.eventHandler); + unit.setLogicalPlan(execBlock.getPlan()); + return unit; + } } /** @@ -593,25 +486,25 @@ public class SubQuery implements EventHandler<SubQueryEvent> { */ public static int calculatePartitionNum(SubQuery subQuery) { TajoConf conf = subQuery.queryContext.getConf(); - SubQuery parent = subQuery.getParentQuery(); + ExecutionBlock parent = subQuery.getBlock().getParentBlock(); GroupbyNode grpNode = null; if (parent != null) { grpNode = (GroupbyNode) PlannerUtil.findTopNode( - parent.getLogicalPlan(), ExprType.GROUP_BY); + parent.getPlan(), ExprType.GROUP_BY); } // Is this subquery the first step of join? if (parent != null && parent.getScanNodes().length == 2) { - Iterator<SubQuery> child = parent.getChildQueries().iterator(); + Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator(); // for inner - SubQuery outer = child.next(); - long outerVolume = getInputVolume(outer); + ExecutionBlock outer = child.next(); + long outerVolume = getInputVolume(subQuery.queryContext, outer); // for inner - SubQuery inner = child.next(); - long innerVolume = getInputVolume(inner); + ExecutionBlock inner = child.next(); + long innerVolume = getInputVolume(subQuery.queryContext, inner); LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576)); LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576)); @@ -631,7 +524,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { if (grpNode.getGroupingColumns().length == 0) { return 1; } else { - long volume = getInputVolume(subQuery); + long volume = getInputVolume(subQuery.queryContext, subQuery.block); int mb = (int) Math.ceil((double)volume / 1048576); LOG.info("Table's volume is approximately " + mb + " MB"); @@ -643,7 +536,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } else { LOG.info("============>>>>> Unexpected Case! <<<<<================"); - long volume = getInputVolume(subQuery); + long volume = getInputVolume(subQuery.queryContext, subQuery.block); int mb = (int) Math.ceil((double)volume / 1048576); LOG.info("Table's volume is approximately " + mb + " MB"); @@ -654,20 +547,20 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } - public static long getInputVolume(SubQuery subQuery) { - CatalogService catalog = subQuery.queryContext.getCatalog(); - if (subQuery.hasChildQuery()) { - Iterator<SubQuery> it = subQuery.getChildQueries().iterator(); + public static long getInputVolume(QueryContext context, ExecutionBlock execBlock) { + CatalogService catalog = context.getCatalog(); + if (execBlock.isLeafBlock()) { + ScanNode outerScan = execBlock.getScanNodes()[0]; + TableStat stat = catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat(); + return stat.getNumBytes(); + } else { long aggregatedVolume = 0; - while(it.hasNext()) { - aggregatedVolume += it.next().getStats().getNumBytes(); + for (ExecutionBlock childBlock : execBlock.getChildBlocks()) { + SubQuery subquery = context.getSubQuery(childBlock.getId()); + aggregatedVolume += subquery.getStats().getNumBytes(); } return aggregatedVolume; - } else { - ScanNode outerScan = subQuery.getScanNodes()[0]; - TableStat stat = catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat(); - return stat.getNumBytes(); } } @@ -679,7 +572,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { */ public static int getNonLeafTaskNum(SubQuery subQuery) { // Getting intermediate data size - long volume = getInputVolume(subQuery); + long volume = getInputVolume(subQuery.queryContext, subQuery.getBlock()); int mb = (int) Math.ceil((double)volume / 1048576); LOG.info("Table's volume is approximately " + mb + " MB"); @@ -766,6 +659,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { // TODO - records succeeded, failed, killed completed task // TODO - records metrics + ExecutionBlock execBlock = subQuery.getBlock(); + for (Entry<ContainerId, Container> entry : subQuery.containers.entrySet()) { subQuery.eventHandler.handle(new TaskRunnerStopEvent(subQuery.getId(), entry.getValue())); @@ -773,13 +668,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> { subQuery.cleanUp(); subQuery.taskScheduler.stop(); - StoreTableNode storeTableNode = subQuery.getStoreTableNode(); + StoreTableNode storeTableNode = execBlock.getStoreTableNode(); TableMeta meta = toTableMeta(storeTableNode); meta.setStat(subQuery.getStats()); subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(), meta)); - subQuery.finishTime = subQuery.clock.getTime(); + subQuery.finishTime = subQuery.queryContext.getClock().getTime(); } } @@ -809,15 +704,16 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private void writeStat(SubQuery subQuery, TableStat stat) throws IOException { + ExecutionBlock execBlock = subQuery.getBlock(); - if (subQuery.getLogicalPlan().getType() == ExprType.CREATE_INDEX) { - IndexWriteNode index = (IndexWriteNode) subQuery.getLogicalPlan(); + if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) { + IndexWriteNode index = (IndexWriteNode) execBlock.getPlan(); Path indexPath = new Path(sm.getTablePath(index.getTableName()), "index"); TableMeta meta; if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) { meta = sm.getTableMeta(indexPath); } else { - StoreTableNode storeTableNode = subQuery.getStoreTableNode(); + StoreTableNode storeTableNode = execBlock.getStoreTableNode(); meta = toTableMeta(storeTableNode); } String indexName = IndexUtil.getIndexName(index.getTableName(), @@ -828,10 +724,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> { sm.writeTableMeta(indexPath, meta); } else { - StoreTableNode storeTableNode = subQuery.getStoreTableNode(); + StoreTableNode storeTableNode = execBlock.getStoreTableNode(); TableMeta meta = toTableMeta(storeTableNode); meta.setStat(stat); - sm.writeTableMeta(sm.getTablePath(subQuery.getOutputName()), meta); + sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta); } } @@ -845,19 +741,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } - private void finalizePrevSubQuery(SubQuery subQuery) - throws Exception { - SubQuery prevSubQuery; - for (ScanNode scan : subQuery.getScanNodes()) { - prevSubQuery = subQuery.getChildQuery(scan); - if (prevSubQuery.getStoreTableNode().getSubNode().getType() != ExprType.UNION) { - for (QueryUnit unit : prevSubQuery.getQueryUnits()) { - //sendCommand(unit.getLastAttempt(), CommandType.FINALIZE); - } - } - } - } - @Override public void handle(SubQueryEvent event) { //if (LOG.isDebugEnabled()) { @@ -871,14 +754,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> { getStateMachine().doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state", e); - eventHandler.handle(new SubQueryEvent(this.id, + eventHandler.handle(new SubQueryEvent(getId(), SubQueryEventType.SQ_INTERNAL_ERROR)); } //notify the eventhandler of state change if (LOG.isDebugEnabled()) { if (oldState != getState()) { - LOG.debug(id + " SubQuery Transitioned from " + oldState + " to " + LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to " + getState()); } } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java deleted file mode 100644 index 8756dc4..0000000 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package tajo.master; - -import org.apache.hadoop.yarn.service.AbstractService; - -public class TaskContainerManager extends AbstractService { - - public TaskContainerManager() { - super(TaskContainerManager.class.getName()); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java index 3f92608..816e285 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java @@ -389,8 +389,7 @@ public class TaskSchedulerImpl extends AbstractService LOG.debug("Assigned based on * match"); QueryUnit task; - task = context.getQuery() - .getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId()); + task = context.getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId()); QueryUnitRequest taskAssign = new QueryUnitRequestImpl( attemptId, Lists.newArrayList(task.getAllFragments()), http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java index 93b786d..43fc1d0 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java @@ -126,7 +126,7 @@ public class WorkerListener extends AbstractService QueryUnitAttemptIdProto attemptIdProto, RpcCallback<BoolProto> done) { QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto); - context.getQuery(attemptId.getQueryId()).getContext().getQuery().getSubQuery(attemptId.getSubQueryId()). + context.getQuery(attemptId.getQueryId()).getContext().getSubQuery(attemptId.getSubQueryId()). getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId). resetExpireTime(); done.run(TRUE_PROTO); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java index 28ccbfe..d0fae9f 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java @@ -47,7 +47,7 @@ import tajo.engine.planner.logical.StoreTableNode; import tajo.engine.planner.physical.PhysicalExec; import tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface; import tajo.ipc.protocolrecords.QueryUnitRequest; -import tajo.master.SubQuery.PARTITION_TYPE; +import tajo.master.ExecutionBlock.PartitionType; import tajo.rpc.NullCallback; import tajo.storage.Fragment; import tajo.storage.StorageUtil; @@ -101,7 +101,7 @@ public class Task { private AtomicBoolean progressFlag = new AtomicBoolean(false); // TODO - to be refactored - private PARTITION_TYPE partitionType = null; + private PartitionType partitionType = null; private Schema finalSchema = null; private TupleComparator sortComp = null; @@ -153,7 +153,7 @@ public class Task { context.setInterQuery(); StoreTableNode store = (StoreTableNode) plan; this.partitionType = store.getPartitionType(); - if (partitionType == PARTITION_TYPE.RANGE) { + if (partitionType == PartitionType.RANGE) { SortNode sortNode = (SortNode) store.getSubNode(); this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()); this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys()); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java index 2af74bb..be8a7f1 100644 --- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java +++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java @@ -46,9 +46,9 @@ import tajo.engine.planner.LogicalPlanner; import tajo.engine.planner.PlanningContext; import tajo.engine.planner.global.MasterPlan; import tajo.engine.planner.logical.*; +import tajo.master.ExecutionBlock; +import tajo.master.ExecutionBlock.PartitionType; import tajo.master.GlobalPlanner; -import tajo.master.SubQuery; -import tajo.master.SubQuery.PARTITION_TYPE; import tajo.master.TajoMaster; import tajo.storage.*; @@ -159,11 +159,11 @@ public class TestGlobalQueryPlanner { MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan1); - SubQuery unit = globalPlan.getRoot(); - assertFalse(unit.hasChildQuery()); - assertEquals(PARTITION_TYPE.LIST, unit.getOutputType()); + ExecutionBlock unit = globalPlan.getRoot(); + assertFalse(unit.hasChildBlock()); + assertEquals(PartitionType.LIST, unit.getPartitionType()); - LogicalNode plan2 = unit.getLogicalPlan(); + LogicalNode plan2 = unit.getPlan(); assertEquals(ExprType.STORE, plan2.getType()); assertEquals(ExprType.SCAN, ((StoreTableNode)plan2).getSubNode().getType()); } @@ -178,20 +178,20 @@ public class TestGlobalQueryPlanner { MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan); - SubQuery next, prev; + ExecutionBlock next, prev; next = globalPlan.getRoot(); - assertTrue(next.hasChildQuery()); - assertEquals(PARTITION_TYPE.LIST, next.getOutputType()); + assertTrue(next.hasChildBlock()); + assertEquals(PartitionType.LIST, next.getPartitionType()); for (ScanNode scan : next.getScanNodes()) { assertTrue(scan.isLocal()); } assertFalse(next.getStoreTableNode().isLocal()); - Iterator<SubQuery> it= next.getChildIterator(); + Iterator<ExecutionBlock> it= next.getChildBlocks().iterator(); prev = it.next(); - assertFalse(prev.hasChildQuery()); - assertEquals(PARTITION_TYPE.HASH, prev.getOutputType()); + assertFalse(prev.hasChildBlock()); + assertEquals(PartitionType.HASH, prev.getPartitionType()); assertTrue(prev.getStoreTableNode().isLocal()); assertFalse(it.hasNext()); @@ -216,26 +216,26 @@ public class TestGlobalQueryPlanner { MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan); - SubQuery next, prev; + ExecutionBlock next, prev; next = globalPlan.getRoot(); assertEquals(ExprType.PROJECTION, next.getStoreTableNode().getSubNode().getType()); - assertTrue(next.hasChildQuery()); - assertEquals(PARTITION_TYPE.LIST, next.getOutputType()); - Iterator<SubQuery> it= next.getChildIterator(); + assertTrue(next.hasChildBlock()); + assertEquals(PartitionType.LIST, next.getPartitionType()); + Iterator<ExecutionBlock> it= next.getChildBlocks().iterator(); prev = it.next(); assertEquals(ExprType.SORT, prev.getStoreTableNode().getSubNode().getType()); - assertTrue(prev.hasChildQuery()); - assertEquals(PARTITION_TYPE.LIST, prev.getOutputType()); - it= prev.getChildIterator(); + assertTrue(prev.hasChildBlock()); + assertEquals(PartitionType.LIST, prev.getPartitionType()); + it= prev.getChildBlocks().iterator(); next = prev; prev = it.next(); - assertFalse(prev.hasChildQuery()); - assertEquals(PARTITION_TYPE.RANGE, prev.getOutputType()); + assertFalse(prev.hasChildBlock()); + assertEquals(PartitionType.RANGE, prev.getPartitionType()); assertFalse(it.hasNext()); ScanNode []scans = prev.getScanNodes(); @@ -259,59 +259,59 @@ public class TestGlobalQueryPlanner { MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan); - SubQuery next, prev; + ExecutionBlock next, prev; // the second phase of the sort next = globalPlan.getRoot(); - assertTrue(next.hasChildQuery()); - assertEquals(PARTITION_TYPE.LIST, next.getOutputType()); + assertTrue(next.hasChildBlock()); + assertEquals(PartitionType.LIST, next.getPartitionType()); assertEquals(ExprType.PROJECTION, next.getStoreTableNode().getSubNode().getType()); ScanNode []scans = next.getScanNodes(); assertEquals(1, scans.length); - Iterator<SubQuery> it= next.getChildIterator(); + Iterator<ExecutionBlock> it= next.getChildBlocks().iterator(); prev = it.next(); assertEquals(ExprType.SORT, prev.getStoreTableNode().getSubNode().getType()); - assertEquals(PARTITION_TYPE.LIST, prev.getOutputType()); + assertEquals(PartitionType.LIST, prev.getPartitionType()); scans = prev.getScanNodes(); assertEquals(1, scans.length); - it= prev.getChildIterator(); + it= prev.getChildBlocks().iterator(); // the first phase of the sort prev = it.next(); assertEquals(ExprType.SORT, prev.getStoreTableNode().getSubNode().getType()); assertEquals(scans[0].getInSchema(), prev.getOutputSchema()); - assertTrue(prev.hasChildQuery()); - assertEquals(PARTITION_TYPE.RANGE, prev.getOutputType()); + assertTrue(prev.hasChildBlock()); + assertEquals(PartitionType.RANGE, prev.getPartitionType()); assertFalse(it.hasNext()); scans = prev.getScanNodes(); assertEquals(1, scans.length); next = prev; - it= next.getChildIterator(); + it= next.getChildBlocks().iterator(); // the second phase of the join prev = it.next(); assertEquals(ExprType.JOIN, prev.getStoreTableNode().getSubNode().getType()); assertEquals(scans[0].getInSchema(), prev.getOutputSchema()); - assertTrue(prev.hasChildQuery()); - assertEquals(PARTITION_TYPE.LIST, prev.getOutputType()); + assertTrue(prev.hasChildBlock()); + assertEquals(PartitionType.LIST, prev.getPartitionType()); assertFalse(it.hasNext()); scans = prev.getScanNodes(); assertEquals(2, scans.length); next = prev; - it= next.getChildIterator(); + it= next.getChildBlocks().iterator(); // the first phase of the join prev = it.next(); assertEquals(ExprType.SCAN, prev.getStoreTableNode().getSubNode().getType()); - assertFalse(prev.hasChildQuery()); - assertEquals(PARTITION_TYPE.HASH, prev.getOutputType()); + assertFalse(prev.hasChildBlock()); + assertEquals(PartitionType.HASH, prev.getPartitionType()); assertEquals(1, prev.getScanNodes().length); prev = it.next(); assertEquals(ExprType.SCAN, prev.getStoreTableNode().getSubNode().getType()); - assertFalse(prev.hasChildQuery()); - assertEquals(PARTITION_TYPE.HASH, prev.getOutputType()); + assertFalse(prev.hasChildBlock()); + assertEquals(PartitionType.HASH, prev.getPartitionType()); assertEquals(1, prev.getScanNodes().length); assertFalse(it.hasNext()); } @@ -325,15 +325,15 @@ public class TestGlobalQueryPlanner { MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan); - SubQuery unit = globalPlan.getRoot(); + ExecutionBlock unit = globalPlan.getRoot(); StoreTableNode store = unit.getStoreTableNode(); assertEquals(ExprType.JOIN, store.getSubNode().getType()); - assertTrue(unit.hasChildQuery()); + assertTrue(unit.hasChildBlock()); ScanNode [] scans = unit.getScanNodes(); assertEquals(2, scans.length); - SubQuery prev; + ExecutionBlock prev; for (ScanNode scan : scans) { - prev = unit.getChildQuery(scan); + prev = unit.getChildBlock(scan); store = prev.getStoreTableNode(); assertEquals(ExprType.SCAN, store.getSubNode().getType()); } @@ -348,14 +348,14 @@ public class TestGlobalQueryPlanner { MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan); - SubQuery unit = globalPlan.getRoot(); + ExecutionBlock unit = globalPlan.getRoot(); StoreTableNode store = unit.getStoreTableNode(); assertEquals(ExprType.PROJECTION, store.getSubNode().getType()); ScanNode[] scans = unit.getScanNodes(); assertEquals(1, scans.length); - unit = unit.getChildQuery(scans[0]); + unit = unit.getChildBlock(scans[0]); store = unit.getStoreTableNode(); assertEquals(ExprType.UNION, store.getSubNode().getType()); UnionNode union = (UnionNode) store.getSubNode(); @@ -367,11 +367,11 @@ public class TestGlobalQueryPlanner { union = (UnionNode) union.getInnerNode(); assertEquals(ExprType.SCAN, union.getOuterNode().getType()); assertEquals(ExprType.SCAN, union.getInnerNode().getType()); - assertTrue(unit.hasChildQuery()); + assertTrue(unit.hasChildBlock()); String tableId = ""; for (ScanNode scan : unit.getScanNodes()) { - SubQuery prev = unit.getChildQuery(scan); + ExecutionBlock prev = unit.getChildBlock(scan); store = prev.getStoreTableNode(); assertEquals(ExprType.GROUP_BY, store.getSubNode().getType()); GroupbyNode groupby = (GroupbyNode) store.getSubNode(); @@ -382,7 +382,7 @@ public class TestGlobalQueryPlanner { assertEquals(tableId, store.getTableName()); } assertEquals(1, prev.getScanNodes().length); - prev = prev.getChildQuery(prev.getScanNodes()[0]); + prev = prev.getChildBlock(prev.getScanNodes()[0]); store = prev.getStoreTableNode(); assertEquals(ExprType.GROUP_BY, store.getSubNode().getType()); groupby = (GroupbyNode) store.getSubNode(); @@ -486,13 +486,13 @@ public class TestGlobalQueryPlanner { MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan); - SubQuery second, first, mid; + ExecutionBlock second, first, mid; ScanNode secondScan, firstScan, midScan; second = globalPlan.getRoot(); assertTrue(second.getScanNodes().length == 1); - first = second.getChildQuery(second.getScanNodes()[0]); + first = second.getChildBlock(second.getScanNodes()[0]); GroupbyNode firstGroupby, secondGroupby, midGroupby; secondGroupby = (GroupbyNode) second.getStoreTableNode().getSubNode(); @@ -510,10 +510,10 @@ public class TestGlobalQueryPlanner { midScan = mid.getScanNodes()[0]; firstScan = first.getScanNodes()[0]; - assertTrue(first.getParentQuery().equals(mid)); - assertTrue(mid.getParentQuery().equals(second)); - assertTrue(second.getChildQuery(secondScan).equals(mid)); - assertTrue(mid.getChildQuery(midScan).equals(first)); + assertTrue(first.getParentBlock().equals(mid)); + assertTrue(mid.getParentBlock().equals(second)); + assertTrue(second.getChildBlock(secondScan).equals(mid)); + assertTrue(mid.getChildBlock(midScan).equals(first)); assertEquals(first.getOutputName(), midScan.getTableId()); assertEquals(first.getOutputSchema(), midScan.getInSchema()); assertEquals(mid.getOutputName(), secondScan.getTableId()); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java index fae28f7..c0d0dd2 100644 --- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java +++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java @@ -43,8 +43,8 @@ import tajo.engine.planner.LogicalOptimizer; import tajo.engine.planner.LogicalPlanner; import tajo.engine.planner.PlanningContext; import tajo.engine.planner.logical.*; +import tajo.master.ExecutionBlock; import tajo.master.GlobalPlanner; -import tajo.master.SubQuery; import tajo.storage.*; import java.io.IOException; @@ -144,7 +144,7 @@ public class TestGlobalQueryOptimizer { (LogicalRootNode) plan); globalPlan = optimizer.optimize(globalPlan); - SubQuery unit = globalPlan.getRoot(); + ExecutionBlock unit = globalPlan.getRoot(); StoreTableNode store = unit.getStoreTableNode(); assertEquals(ExprType.PROJECTION, store.getSubNode().getType()); ProjectionNode proj = (ProjectionNode) store.getSubNode(); @@ -153,16 +153,16 @@ public class TestGlobalQueryOptimizer { assertEquals(ExprType.SCAN, sort.getSubNode().getType()); ScanNode scan = (ScanNode) sort.getSubNode(); - assertTrue(unit.hasChildQuery()); - unit = unit.getChildQuery(scan); + assertTrue(unit.hasChildBlock()); + unit = unit.getChildBlock(scan); store = unit.getStoreTableNode(); assertEquals(ExprType.SORT, store.getSubNode().getType()); sort = (SortNode) store.getSubNode(); assertEquals(ExprType.JOIN, sort.getSubNode().getType()); - assertTrue(unit.hasChildQuery()); + assertTrue(unit.hasChildBlock()); for (ScanNode prevscan : unit.getScanNodes()) { - SubQuery prev = unit.getChildQuery(prevscan); + ExecutionBlock prev = unit.getChildBlock(prevscan); store = prev.getStoreTableNode(); assertEquals(ExprType.SCAN, store.getSubNode().getType()); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java index 6b8496a..8b5dee5 100644 --- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -46,7 +46,7 @@ import tajo.engine.planner.logical.LogicalNode; import tajo.engine.planner.logical.LogicalRootNode; import tajo.engine.planner.logical.StoreTableNode; import tajo.engine.planner.logical.UnionNode; -import tajo.master.SubQuery; +import tajo.master.ExecutionBlock.PartitionType; import tajo.master.TajoMaster; import tajo.storage.*; import tajo.storage.index.bst.BSTIndex; @@ -424,7 +424,7 @@ public class TestPhysicalPlanner { Column key1 = new Column("score.deptName", DataType.STRING); Column key2 = new Column("score.class", DataType.STRING); StoreTableNode storeNode = new StoreTableNode("partition"); - storeNode.setPartitions(SubQuery.PARTITION_TYPE.HASH, new Column[]{key1, key2}, numPartitions); + storeNode.setPartitions(PartitionType.HASH, new Column[]{key1, key2}, numPartitions); PlannerUtil.insertNode(plan, storeNode); plan = LogicalOptimizer.optimize(context, plan); @@ -482,7 +482,7 @@ public class TestPhysicalPlanner { int numPartitions = 1; StoreTableNode storeNode = new StoreTableNode("emptyset"); - storeNode.setPartitions(SubQuery.PARTITION_TYPE.HASH, new Column[] {}, numPartitions); + storeNode.setPartitions(PartitionType.HASH, new Column[] {}, numPartitions); PlannerUtil.insertNode(plan, storeNode); plan = LogicalOptimizer.optimize(context, plan); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java new file mode 100644 index 0000000..c6a5d43 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java @@ -0,0 +1,101 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tajo.master; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.junit.BeforeClass; +import org.junit.Test; +import tajo.QueryIdFactory; +import tajo.TajoTestingCluster; +import tajo.benchmark.TPCH; +import tajo.catalog.CatalogService; +import tajo.catalog.TCatUtil; +import tajo.catalog.TableDesc; +import tajo.catalog.TableMeta; +import tajo.catalog.proto.CatalogProtos; +import tajo.conf.TajoConf; +import tajo.engine.parser.QueryAnalyzer; +import tajo.engine.planner.LogicalPlanner; +import tajo.engine.planner.PlanningContext; +import tajo.engine.planner.global.MasterPlan; +import tajo.engine.planner.logical.LogicalNode; +import tajo.engine.planner.logical.LogicalRootNode; +import tajo.storage.StorageManager; + +import static org.junit.Assert.assertEquals; + +public class TestExecutionBlockCursor { + private static TajoTestingCluster util; + private static TajoConf conf; + private static CatalogService catalog; + private static GlobalPlanner planner; + private static QueryAnalyzer analyzer; + private static LogicalPlanner logicalPlanner; + + @BeforeClass + public static void setUp() throws Exception { + util = new TajoTestingCluster(); + util.startCatalogCluster(); + + conf = util.getConfiguration(); + catalog = util.getMiniCatalogCluster().getCatalog(); + TPCH tpch = new TPCH(); + tpch.loadSchemas(); + tpch.loadOutSchema(); + for (String table : tpch.getTableNames()) { + TableMeta m = TCatUtil.newTableMeta(tpch.getSchema(table), CatalogProtos.StoreType.CSV); + TableDesc d = TCatUtil.newTableDesc(table, m, new Path("file:///")); + catalog.addTable(d); + } + + analyzer = new QueryAnalyzer(catalog); + logicalPlanner = new LogicalPlanner(catalog); + + StorageManager sm = new StorageManager(conf); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + planner = new GlobalPlanner(conf, catalog, sm, dispatcher.getEventHandler()); + } + + public static void tearDown() { + util.shutdownCatalogCluster(); + } + + @Test + public void testNextBlock() throws Exception { + PlanningContext context = analyzer.parse( + "select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, ps_supplycost, " + + "r_name, p_type, p_size " + + "from region join nation on n_regionkey = r_regionkey and r_name = 'AMERICA' " + + "join supplier on s_nationkey = n_nationkey " + + "join partsupp on s_suppkey = ps_suppkey " + + "join part on p_partkey = ps_partkey and p_type like '%BRASS' and p_size = 15"); + LogicalNode logicalPlan = logicalPlanner.createPlan(context); + MasterPlan plan = planner.build(QueryIdFactory.newQueryId(), (LogicalRootNode) logicalPlan); + + ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan); + + int count = 0; + while(cursor.hasNext()) { + cursor.nextBlock(); + count++; + } + + // 4 input relations, 4 join, and 1 projection = 10 execution blocks + assertEquals(10, count); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java index ca3e870..2de54b3 100644 --- a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java +++ b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java @@ -23,6 +23,7 @@ import org.junit.Test; import tajo.QueryId; import tajo.SubQueryId; import tajo.TestQueryUnitId; +import tajo.master.ExecutionBlock.PartitionType; import tajo.util.TUtil; import tajo.util.TajoIdUtils; @@ -47,7 +48,7 @@ public class TestRepartitioner { Collection<URI> uris = Repartitioner. createHashFetchURL(hostName + ":" + port, sid, partitionId, - SubQuery.PARTITION_TYPE.HASH, intermediateEntries); + PartitionType.HASH, intermediateEntries); List<String> taList = TUtil.newList(); for (URI uri : uris) {
