Repository: tajo Updated Branches: refs/heads/master b37583613 -> 6cfd448f7
TAJO-881: JOIN with union query occurs NPE. (Hyoungjun Kim via hyunsik) Closes #48 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6cfd448f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6cfd448f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6cfd448f Branch: refs/heads/master Commit: 6cfd448f7fb60a254a237230f071456f188f9179 Parents: b375836 Author: Hyunsik Choi <[email protected]> Authored: Thu Jun 26 12:08:12 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Thu Jun 26 12:08:12 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/engine/planner/enforce/Enforcer.java | 2 + .../engine/planner/global/ExecutionBlock.java | 10 + .../engine/planner/global/GlobalPlanner.java | 157 +++++++++++-- .../tajo/engine/planner/global/MasterPlan.java | 7 +- .../planner/rewrite/ProjectionPushDownRule.java | 11 +- .../tajo/master/DefaultTaskScheduler.java | 7 +- .../tajo/master/querymaster/QueryUnit.java | 12 +- .../tajo/master/querymaster/Repartitioner.java | 91 ++++---- .../tajo/engine/query/TestUnionQuery.java | 220 +++++++++++++++++++ 10 files changed, 457 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 77186e6..6389bf9 100644 --- a/CHANGES +++ b/CHANGES @@ -74,6 +74,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-881: JOIN with union query occurs NPE. (Hyoungjun Kim via hyunsik) + TAJO-884: complex join conditions should be supported in ON clause. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java index 36820cc..031569e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java @@ -311,6 +311,8 @@ public class Enforcer implements ProtoObject<EnforcerProto> { } break; case SORTED_INPUT: + SortedInputEnforce sortedInput = property.getSortedInput(); + sb.append("sorted input=" + sortedInput.getTableName()); } return sb.toString(); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java index b731cec..1d14996 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java @@ -34,6 +34,9 @@ public class ExecutionBlock { private List<ScanNode> scanlist = new ArrayList<ScanNode>(); private Enforcer enforcer = new Enforcer(); + // Actual ScanNode's ExecutionBlockId -> Delegated ScanNode's ExecutionBlockId. + private Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = new HashMap<ExecutionBlockId, ExecutionBlockId>(); + private boolean hasJoinPlan; private boolean hasUnionPlan; @@ -83,6 +86,13 @@ public class ExecutionBlock { } } + public void addUnionScan(ExecutionBlockId realScanEbId, ExecutionBlockId delegatedScanEbId) { + unionScanMap.put(realScanEbId, delegatedScanEbId); + } + + public Map<ExecutionBlockId, ExecutionBlockId> getUnionScanMap() { + return unionScanMap; + } public LogicalNode getPlan() { return plan; http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 536dbd8..f12b7e2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; @@ -230,7 +231,7 @@ public class GlobalPlanner { ExecutionBlock leftBlock, ExecutionBlock rightBlock) throws PlanningException { MasterPlan masterPlan = context.plan; - ExecutionBlock currentBlock = null; + ExecutionBlock currentBlock; boolean autoBroadcast = conf.getBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO); @@ -310,22 +311,143 @@ public class GlobalPlanner { } // symmetric repartition join - currentBlock = masterPlan.newExecutionBlock(); + boolean leftUnion = leftNode.getType() == NodeType.TABLE_SUBQUERY && + ((TableSubQueryNode)leftNode).getSubQuery().getType() == NodeType.UNION; + boolean rightUnion = rightNode.getType() == NodeType.TABLE_SUBQUERY && + ((TableSubQueryNode)rightNode).getSubQuery().getType() == NodeType.UNION; + + if (leftUnion || rightUnion) { // if one of child execution block is union + /* + Join with tableC and result of union tableA, tableB is expected the following physical plan. + But Union execution block is not necessary. + |-eb_0001_000006 (Terminal) + |-eb_0001_000005 (Join eb_0001_000003, eb_0001_000004) + |-eb_0001_000004 (Scan TableC) + |-eb_0001_000003 (Union TableA, TableB) + |-eb_0001_000002 (Scan TableB) + |-eb_0001_000001 (Scan TableA) + + The above plan can be changed to the following plan. + |-eb_0001_000005 (Terminal) + |-eb_0001_000003 (Join [eb_0001_000001, eb_0001_000002], eb_0001_000004) + |-eb_0001_000004 (Scan TableC) + |-eb_0001_000002 (Scan TableB) + |-eb_0001_000001 (Scan TableA) + + eb_0001_000003's left child should be eb_0001_000001 + eb_0001_000001 and right child should be eb_0001_000004. + For this eb_0001_000001 is representative of eb_0001_000001, eb_0001_000002. + So eb_0001_000003's left child is eb_0001_000001 + */ + Column[][] joinColumns = null; + if (joinNode.getJoinType() != JoinType.CROSS) { + // ShuffleKeys need to not have thea-join condition because Tajo supports only equi-join. + joinColumns = PlannerUtil.joinJoinKeyForEachTable(joinNode.getJoinQual(), + leftNode.getOutSchema(), rightNode.getOutSchema(), false); + } - DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true); - DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false); + if (leftUnion && !rightUnion) { // if only left is union + currentBlock = leftBlock; + context.execBlockMap.remove(leftNode.getPID()); + Column[] shuffleKeys = (joinColumns != null) ? joinColumns[0] : null; + Column[] otherSideShuffleKeys = (joinColumns != null) ? joinColumns[1] : null; + buildJoinPlanWithUnionChannel(context, joinNode, currentBlock, leftBlock, rightBlock, leftNode, + shuffleKeys, otherSideShuffleKeys, true); + currentBlock.setPlan(joinNode); + } else if (!leftUnion && rightUnion) { // if only right is union + currentBlock = rightBlock; + context.execBlockMap.remove(rightNode.getPID()); + Column[] shuffleKeys = (joinColumns != null) ? joinColumns[1] : null; + Column[] otherSideShuffleKeys = (joinColumns != null) ? joinColumns[0] : null; + buildJoinPlanWithUnionChannel(context, joinNode, currentBlock, rightBlock, leftBlock, rightNode, + shuffleKeys, otherSideShuffleKeys, false); + currentBlock.setPlan(joinNode); + } else { // if both are unions + currentBlock = leftBlock; + context.execBlockMap.remove(leftNode.getPID()); + context.execBlockMap.remove(rightNode.getPID()); + buildJoinPlanWithUnionChannel(context, joinNode, currentBlock, leftBlock, null, leftNode, + (joinColumns != null ? joinColumns[0] : null), null, true); + buildJoinPlanWithUnionChannel(context, joinNode, currentBlock, rightBlock, null, rightNode, + (joinColumns != null ? joinColumns[1] : null), null, false); + currentBlock.setPlan(joinNode); + } - ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel); - ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel); + return currentBlock; + } else { + // !leftUnion && !rightUnion + currentBlock = masterPlan.newExecutionBlock(); + DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true); + DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false); - joinNode.setLeftChild(leftScan); - joinNode.setRightChild(rightScan); - currentBlock.setPlan(joinNode); + ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel); + ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel); - masterPlan.addConnect(leftChannel); - masterPlan.addConnect(rightChannel); + joinNode.setLeftChild(leftScan); + joinNode.setRightChild(rightScan); + currentBlock.setPlan(joinNode); - return currentBlock; + masterPlan.addConnect(leftChannel); + masterPlan.addConnect(rightChannel); + + return currentBlock; + } + } + + private void buildJoinPlanWithUnionChannel(GlobalPlanContext context, JoinNode joinNode, + ExecutionBlock targetBlock, + ExecutionBlock sourceBlock, + ExecutionBlock otherSideBlock, + LogicalNode childNode, + Column[] shuffleKeys, + Column[] otherSideShuffleKeys, + boolean left) { + MasterPlan masterPlan = context.getPlan(); + String subQueryRelationName = ((TableSubQueryNode)childNode).getCanonicalName(); + ExecutionBlockId dedicatedScanNodeBlock = null; + for (DataChannel channel : masterPlan.getIncomingChannels(sourceBlock.getId())) { + // If all union and right, add channel to left + if (otherSideBlock == null && !left) { + DataChannel oldChannel = channel; + masterPlan.disconnect(oldChannel.getSrcId(), oldChannel.getTargetId()); + channel = new DataChannel(oldChannel.getSrcId(), targetBlock.getId()); + } + channel.setSchema(childNode.getOutSchema()); + channel.setShuffleType(HASH_SHUFFLE); + channel.setShuffleOutputNum(32); + if (shuffleKeys != null) { + channel.setShuffleKeys(shuffleKeys); + } + + ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel); + scanNode.getOutSchema().setQualifier(subQueryRelationName); + if (dedicatedScanNodeBlock == null) { + dedicatedScanNodeBlock = channel.getSrcId(); + if (left) { + joinNode.setLeftChild(scanNode); + } else { + joinNode.setRightChild(scanNode); + } + } + masterPlan.addConnect(channel); + targetBlock.addUnionScan(channel.getSrcId(), dedicatedScanNodeBlock); + } + + // create other side channel + if (otherSideBlock != null) { + DataChannel otherSideChannel = new DataChannel(otherSideBlock, targetBlock, HASH_SHUFFLE, 32); + otherSideChannel.setStoreType(storeType); + if (otherSideShuffleKeys != null) { + otherSideChannel.setShuffleKeys(otherSideShuffleKeys); + } + masterPlan.addConnect(otherSideChannel); + + ScanNode scan = buildInputExecutor(masterPlan.getLogicalPlan(), otherSideChannel); + if (left) { + joinNode.setRightChild(scan); + } else { + joinNode.setLeftChild(scan); + } + } } private AggregationFunctionCallEval createSumFunction(EvalNode [] args) throws InternalException { @@ -1211,7 +1333,6 @@ public class GlobalPlanner { } } } - if (leftMostSubQueryNode != null) { // replace target column name Target[] targets = leftMostSubQueryNode.getTargets(); @@ -1221,6 +1342,16 @@ public class GlobalPlanner { throw new PlanningException("Target of a UnionNode's subquery should be FieldEval."); } int index = leftMostSubQueryNode.getInSchema().getColumnId(targets[i].getNamedColumn().getQualifiedName()); + if (index < 0) { + // If a target has alias, getNamedColumn() only returns alias + Set<Column> columns = EvalTreeUtil.findUniqueColumns(targets[i].getEvalTree()); + Column column = columns.iterator().next(); + index = leftMostSubQueryNode.getInSchema().getColumnId(column.getQualifiedName()); + } + if (index < 0) { + throw new PlanningException("Can't find matched Target in UnionNode's input schema: " + targets[i] + + "->" + leftMostSubQueryNode.getInSchema()); + } targetMappings[i] = index; } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java index 37b0db1..a8593e5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java @@ -238,7 +238,11 @@ public class MasterPlan { if (!isLeaf(block)) { sb.append("\n[Incoming]\n"); for (DataChannel channel : getIncomingChannels(block.getId())) { - sb.append(channel).append("\n"); + sb.append(channel); + if (block.getUnionScanMap().containsKey(channel.getSrcId())) { + sb.append(", union delegated scan: ").append(block.getUnionScanMap().get(channel.getSrcId())); + } + sb.append("\n"); } } @@ -250,6 +254,7 @@ public class MasterPlan { } } + if (block.getEnforcer().getProperties().size() > 0) { sb.append("\n[Enforcers]\n"); int i = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java index 827daee..8d80d39 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java @@ -27,6 +27,7 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.engine.eval.*; import org.apache.tajo.engine.planner.*; +import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock; import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.utils.SchemaUtil; import org.apache.tajo.util.TUtil; @@ -54,11 +55,15 @@ public class ProjectionPushDownRule extends public boolean isEligible(LogicalPlan plan) { LogicalNode toBeOptimized = plan.getRootBlock().getRoot(); - if (PlannerUtil.checkIfDDLPlan(toBeOptimized) || !plan.getRootBlock().hasTableExpression()) { + if (PlannerUtil.checkIfDDLPlan(toBeOptimized)) { return false; } - - return true; + for (QueryBlock eachBlock: plan.getQueryBlocks()) { + if (eachBlock.hasTableExpression()) { + return true; + } + } + return false; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index 94d0381..21df4e9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -30,7 +30,6 @@ import org.apache.tajo.QueryIdFactory; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.engine.planner.logical.ScanNode; import org.apache.tajo.engine.query.QueryUnitRequest; import org.apache.tajo.engine.query.QueryUnitRequestImpl; import org.apache.tajo.ipc.TajoWorkerProtocol; @@ -872,11 +871,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) { taskAssign.setInterQuery(); } - for (ScanNode scan : task.getScanNodes()) { - Collection<FetchImpl> fetches = task.getFetch(scan); + for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) { + Collection<FetchImpl> fetches = entry.getValue(); if (fetches != null) { for (FetchImpl fetch : fetches) { - taskAssign.addFetch(scan.getTableName(), fetch); + taskAssign.addFetch(entry.getKey(), fetch); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index 33cf19b..6cada07 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; +import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.QueryUnitId; @@ -661,6 +662,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { } public static class IntermediateEntry { + ExecutionBlockId ebId; int taskId; int attemptId; int partId; @@ -673,6 +675,14 @@ public class QueryUnit implements EventHandler<TaskEvent> { this.host = host; } + public ExecutionBlockId getEbId() { + return ebId; + } + + public void setEbId(ExecutionBlockId ebId) { + this.ebId = ebId; + } + public int getTaskId() { return this.taskId; } @@ -691,7 +701,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { @Override public int hashCode() { - return Objects.hashCode(taskId, partId, attemptId, host); + return Objects.hashCode(ebId, taskId, partId, attemptId, host); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 6c000a1..6373bb4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -87,13 +87,9 @@ public class Repartitioner { for (int i = 0; i < scans.length; i++) { TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName()); if (tableDesc == null) { // if it is a real table stored on storage - // TODO - to be fixed (wrong directory) - ExecutionBlock [] childBlocks = new ExecutionBlock[2]; - childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0); - childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1); - tablePath = storageManager.getTablePath(scans[i].getTableName()); - stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getResultStats().getNumBytes(); + ExecutionBlockId scanEBId = TajoIdUtils.createExecutionBlockId(scans[i].getTableName()); + stats[i] = masterContext.getSubQuery(scanEBId).getResultStats().getNumBytes(); fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); } else { tablePath = tableDesc.getPath(); @@ -161,28 +157,38 @@ public class Repartitioner { } else { LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join"); // The hash map is modeling as follows: - // <Part Id, <Table Name, Intermediate Data>> - Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries = new HashMap<Integer, Map<String, List<IntermediateEntry>>>(); + // <Part Id, <EbId, Intermediate Data>> + Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries = + new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>(); // Grouping IntermediateData by a partition key and a table name - for (ScanNode scan : scans) { - SubQuery childSubQuery = masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getCanonicalName())); - for (QueryUnit task : childSubQuery.getQueryUnits()) { + List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId()); + + // In the case of join with union, there is one ScanNode for union. + Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = execBlock.getUnionScanMap(); + for (ExecutionBlock childBlock : childBlocks) { + ExecutionBlockId scanEbId = unionScanMap.get(childBlock.getId()); + if (scanEbId == null) { + scanEbId = childBlock.getId(); + } + SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId()); + for (QueryUnit task : childExecSM.getQueryUnits()) { if (task.getIntermediateData() != null && !task.getIntermediateData().isEmpty()) { for (IntermediateEntry intermEntry : task.getIntermediateData()) { + intermEntry.setEbId(childBlock.getId()); if (hashEntries.containsKey(intermEntry.getPartId())) { - Map<String, List<IntermediateEntry>> tbNameToInterm = + Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = hashEntries.get(intermEntry.getPartId()); - if (tbNameToInterm.containsKey(scan.getCanonicalName())) { - tbNameToInterm.get(scan.getCanonicalName()).add(intermEntry); + if (tbNameToInterm.containsKey(scanEbId)) { + tbNameToInterm.get(scanEbId).add(intermEntry); } else { - tbNameToInterm.put(scan.getCanonicalName(), TUtil.newList(intermEntry)); + tbNameToInterm.put(scanEbId, TUtil.newList(intermEntry)); } } else { - Map<String, List<IntermediateEntry>> tbNameToInterm = - new HashMap<String, List<IntermediateEntry>>(); - tbNameToInterm.put(scan.getCanonicalName(), TUtil.newList(intermEntry)); + Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = + new HashMap<ExecutionBlockId, List<IntermediateEntry>>(); + tbNameToInterm.put(scanEbId, TUtil.newList(intermEntry)); hashEntries.put(intermEntry.getPartId(), tbNameToInterm); } } @@ -190,15 +196,15 @@ public class Repartitioner { //if no intermidatedata(empty table), make empty entry int emptyPartitionId = 0; if (hashEntries.containsKey(emptyPartitionId)) { - Map<String, List<IntermediateEntry>> tbNameToInterm = hashEntries.get(emptyPartitionId); - if (tbNameToInterm.containsKey(scan.getCanonicalName())) - tbNameToInterm.get(scan.getCanonicalName()) - .addAll(new ArrayList<IntermediateEntry>()); + Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = hashEntries.get(emptyPartitionId); + if (tbNameToInterm.containsKey(scanEbId)) + tbNameToInterm.get(scanEbId).addAll(new ArrayList<IntermediateEntry>()); else - tbNameToInterm.put(scan.getCanonicalName(), new ArrayList<IntermediateEntry>()); + tbNameToInterm.put(scanEbId, new ArrayList<IntermediateEntry>()); } else { - Map<String, List<IntermediateEntry>> tbNameToInterm = new HashMap<String, List<IntermediateEntry>>(); - tbNameToInterm.put(scan.getCanonicalName(), new ArrayList<IntermediateEntry>()); + Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = + new HashMap<ExecutionBlockId, List<IntermediateEntry>>(); + tbNameToInterm.put(scanEbId, new ArrayList<IntermediateEntry>()); hashEntries.put(emptyPartitionId, tbNameToInterm); } } @@ -234,7 +240,7 @@ public class Repartitioner { SubQuery.scheduleFragment(subQuery, fragments[0], Arrays.asList(new FileFragment[]{fragments[1]})); // Assign partitions to tasks in a round robin manner. - for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry + for (Entry<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> entry : hashEntries.entrySet()) { addJoinShuffle(subQuery, entry.getKey(), entry.getValue()); } @@ -321,17 +327,19 @@ public class Repartitioner { } private static void addJoinShuffle(SubQuery subQuery, int partitionId, - Map<String, List<IntermediateEntry>> grouppedPartitions) { + Map<ExecutionBlockId, List<IntermediateEntry>> grouppedPartitions) { Map<String, List<FetchImpl>> fetches = new HashMap<String, List<FetchImpl>>(); for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) { - Collection<FetchImpl> requests; - if (grouppedPartitions.containsKey(execBlock.getId().toString())) { - requests = mergeShuffleRequest(execBlock.getId(), partitionId, HASH_SHUFFLE, - grouppedPartitions.get(execBlock.getId().toString())); - } else { - return; + if (grouppedPartitions.containsKey(execBlock.getId())) { + Collection<FetchImpl> requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE, + grouppedPartitions.get(execBlock.getId())); + fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests)); } - fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests)); + } + + if (fetches.isEmpty()) { + LOG.info(subQuery.getId() + "'s " + partitionId + " partition has empty result."); + return; } SubQuery.scheduleFetches(subQuery, fetches); } @@ -342,20 +350,23 @@ public class Repartitioner { * * @return key: pullserver's address, value: a list of requests */ - private static Collection<FetchImpl> mergeShuffleRequest(ExecutionBlockId ebid, int partitionId, + private static Collection<FetchImpl> mergeShuffleRequest(int partitionId, TajoWorkerProtocol.ShuffleType type, List<IntermediateEntry> partitions) { - Map<QueryUnit.PullHost, FetchImpl> mergedPartitions = new HashMap<QueryUnit.PullHost, FetchImpl>(); + // ebId + pullhost -> FetchImmpl + Map<String, FetchImpl> mergedPartitions = new HashMap<String, FetchImpl>(); for (IntermediateEntry partition : partitions) { - QueryUnit.PullHost host = partition.getPullHost(); - if (mergedPartitions.containsKey(host)) { + String mergedKey = partition.getEbId().toString() + "," + partition.getPullHost(); + + if (mergedPartitions.containsKey(mergedKey)) { FetchImpl fetch = mergedPartitions.get(partition.getPullHost()); fetch.addPart(partition.getTaskId(), partition.getAttemptId()); } else { - FetchImpl fetch = new FetchImpl(host, type, ebid, partitionId); + // In some cases like union each IntermediateEntry has different EBID. + FetchImpl fetch = new FetchImpl(partition.getPullHost(), type, partition.getEbId(), partitionId); fetch.addPart(partition.getTaskId(), partition.getAttemptId()); - mergedPartitions.put(partition.getPullHost(), fetch); + mergedPartitions.put(mergedKey, fetch); } } return mergedPartitions.values(); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cfd448f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java index 857cb63..1ec2c33 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java @@ -26,6 +26,8 @@ import org.junit.experimental.categories.Category; import java.sql.ResultSet; +import static org.junit.Assert.assertEquals; + /* * Notations * - S - select @@ -205,4 +207,222 @@ public class TestUnionQuery extends QueryTestCaseBase { assertResultSet(res); cleanupQuery(res); } + + @Test + public final void testLeftUnionWithJoin() throws Exception { + // https://issues.apache.org/jira/browse/TAJO-881 + ResultSet res = executeString( + "select * from ( " + + " select a.id, b.c_name, a.code from ( " + + " select l_orderkey as id, 'lineitem' as code from lineitem " + + " union all " + + " select o_orderkey as id, 'order' as code from orders " + + " ) a " + + " join customer b on a.id = b.c_custkey" + + ") c order by id, code" + ); + + String expected = + "id,c_name,code\n" + + "-------------------------------\n" + + "1,Customer#000000001,lineitem\n" + + "1,Customer#000000001,lineitem\n" + + "1,Customer#000000001,order\n" + + "2,Customer#000000002,lineitem\n" + + "2,Customer#000000002,order\n" + + "3,Customer#000000003,lineitem\n" + + "3,Customer#000000003,lineitem\n" + + "3,Customer#000000003,order\n"; + + assertEquals(expected, resultSetToString(res)); + + cleanupQuery(res); + } + + @Test + public final void testRightUnionWithJoin() throws Exception { + // https://issues.apache.org/jira/browse/TAJO-881 + ResultSet res = executeString( + "select * from ( " + + " select a.id, b.c_name, a.code from customer b " + + " join ( " + + " select l_orderkey as id, 'lineitem' as code from lineitem " + + " union all " + + " select o_orderkey as id, 'order' as code from orders " + + " ) a on a.id = b.c_custkey" + + ") c order by id, code" + ); + + String expected = + "id,c_name,code\n" + + "-------------------------------\n" + + "1,Customer#000000001,lineitem\n" + + "1,Customer#000000001,lineitem\n" + + "1,Customer#000000001,order\n" + + "2,Customer#000000002,lineitem\n" + + "2,Customer#000000002,order\n" + + "3,Customer#000000003,lineitem\n" + + "3,Customer#000000003,lineitem\n" + + "3,Customer#000000003,order\n"; + + assertEquals(expected, resultSetToString(res)); + + cleanupQuery(res); + } + + @Test + public final void testAllUnionWithJoin() throws Exception { + // https://issues.apache.org/jira/browse/TAJO-881 + ResultSet res = executeString( + "select * from ( " + + " select a.id, a.code as code, b.name, b.code as code2 from ( " + + " select l_orderkey as id, 'lineitem' as code from lineitem " + + " union all " + + " select o_orderkey as id, 'order' as code from orders " + + " ) a " + + " join ( " + + " select c_custkey as id, c_name as name, 'customer' as code from customer " + + " union all " + + " select p_partkey as id, p_name as name, 'part' as code from part " + + " ) b on a.id = b.id" + + ") c order by id, code, code2" + ); + + String expected = + "id,code,name,code2\n" + + "-------------------------------\n" + + "1,lineitem,Customer#000000001,customer\n" + + "1,lineitem,Customer#000000001,customer\n" + + "1,lineitem,goldenrod lavender spring chocolate lace,part\n" + + "1,lineitem,goldenrod lavender spring chocolate lace,part\n" + + "1,order,Customer#000000001,customer\n" + + "1,order,goldenrod lavender spring chocolate lace,part\n" + + "2,lineitem,Customer#000000002,customer\n" + + "2,lineitem,blush thistle blue yellow saddle,part\n" + + "2,order,Customer#000000002,customer\n" + + "2,order,blush thistle blue yellow saddle,part\n" + + "3,lineitem,Customer#000000003,customer\n" + + "3,lineitem,Customer#000000003,customer\n" + + "3,lineitem,spring green yellow purple cornsilk,part\n" + + "3,lineitem,spring green yellow purple cornsilk,part\n" + + "3,order,Customer#000000003,customer\n" + + "3,order,spring green yellow purple cornsilk,part\n"; + + assertEquals(expected, resultSetToString(res)); + + cleanupQuery(res); + } + + @Test + public final void testUnionWithCrossJoin() throws Exception { + // https://issues.apache.org/jira/browse/TAJO-881 + ResultSet res = executeString( + "select * from ( " + + " select a.id, b.c_name, a.code from ( " + + " select l_orderkey as id, 'lineitem' as code from lineitem " + + " union all " + + " select o_orderkey as id, 'order' as code from orders " + + " ) a, " + + " customer b " + + ") c order by id, code, c_name" + ); + + String expected = + "id,c_name,code\n" + + "-------------------------------\n" + + "1,Customer#000000001,lineitem\n" + + "1,Customer#000000001,lineitem\n" + + "1,Customer#000000002,lineitem\n" + + "1,Customer#000000002,lineitem\n" + + "1,Customer#000000003,lineitem\n" + + "1,Customer#000000003,lineitem\n" + + "1,Customer#000000004,lineitem\n" + + "1,Customer#000000004,lineitem\n" + + "1,Customer#000000005,lineitem\n" + + "1,Customer#000000005,lineitem\n" + + "1,Customer#000000001,order\n" + + "1,Customer#000000002,order\n" + + "1,Customer#000000003,order\n" + + "1,Customer#000000004,order\n" + + "1,Customer#000000005,order\n" + + "2,Customer#000000001,lineitem\n" + + "2,Customer#000000002,lineitem\n" + + "2,Customer#000000003,lineitem\n" + + "2,Customer#000000004,lineitem\n" + + "2,Customer#000000005,lineitem\n" + + "2,Customer#000000001,order\n" + + "2,Customer#000000002,order\n" + + "2,Customer#000000003,order\n" + + "2,Customer#000000004,order\n" + + "2,Customer#000000005,order\n" + + "3,Customer#000000001,lineitem\n" + + "3,Customer#000000001,lineitem\n" + + "3,Customer#000000002,lineitem\n" + + "3,Customer#000000002,lineitem\n" + + "3,Customer#000000003,lineitem\n" + + "3,Customer#000000003,lineitem\n" + + "3,Customer#000000004,lineitem\n" + + "3,Customer#000000004,lineitem\n" + + "3,Customer#000000005,lineitem\n" + + "3,Customer#000000005,lineitem\n" + + "3,Customer#000000001,order\n" + + "3,Customer#000000002,order\n" + + "3,Customer#000000003,order\n" + + "3,Customer#000000004,order\n" + + "3,Customer#000000005,order\n"; + + assertEquals(expected, resultSetToString(res)); + + cleanupQuery(res); + } + + @Test + public final void testThreeJoinInUnion() throws Exception { + // https://issues.apache.org/jira/browse/TAJO-881 + ResultSet res = executeString( + "select orders.o_orderkey \n" + + "from orders\n" + + "join lineitem on orders.o_orderkey = lineitem.l_orderkey\n" + + "join customer on orders.o_custkey = customer.c_custkey\n" + + "union all \n" + + "select nation.n_nationkey from nation" + ); + String expected = + "o_orderkey\n" + + "-------------------------------\n" + + "1\n" + + "1\n" + + "2\n" + + "3\n" + + "3\n" + + "0\n" + + "1\n" + + "2\n" + + "3\n" + + "4\n" + + "5\n" + + "6\n" + + "7\n" + + "8\n" + + "9\n" + + "10\n" + + "11\n" + + "12\n" + + "13\n" + + "14\n" + + "15\n" + + "16\n" + + "17\n" + + "18\n" + + "19\n" + + "20\n" + + "21\n" + + "22\n" + + "23\n" + + "24\n"; + + assertEquals(expected, resultSetToString(res)); + + cleanupQuery(res); + } } \ No newline at end of file
