TAJO-789: Improve shuffle URI. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8321d263 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8321d263 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8321d263 Branch: refs/heads/window_function Commit: 8321d263e39987aa3bd075c8da163ee093cad420 Parents: 71f394d Author: jinossy <[email protected]> Authored: Mon Apr 28 20:22:40 2014 +0900 Committer: jinossy <[email protected]> Committed: Mon Apr 28 20:22:40 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/engine/query/QueryUnitRequest.java | 6 +- .../tajo/engine/query/QueryUnitRequestImpl.java | 39 ++-- .../tajo/master/DefaultTaskScheduler.java | 10 +- .../apache/tajo/master/FetchScheduleEvent.java | 8 +- .../apache/tajo/master/LazyTaskScheduler.java | 8 +- .../apache/tajo/master/ScheduledFetches.java | 11 +- .../tajo/master/querymaster/QueryUnit.java | 103 +++++---- .../master/querymaster/QueryUnitAttempt.java | 4 +- .../tajo/master/querymaster/Repartitioner.java | 224 +++++++++---------- .../tajo/master/querymaster/SubQuery.java | 4 +- .../java/org/apache/tajo/worker/FetchImpl.java | 201 +++++++++++++++++ .../java/org/apache/tajo/worker/Fetcher.java | 2 +- .../main/java/org/apache/tajo/worker/Task.java | 39 ++-- .../src/main/proto/TajoWorkerProtocol.proto | 20 +- .../main/resources/webapps/worker/queryunit.jsp | 9 +- .../apache/tajo/master/TestRepartitioner.java | 21 +- 17 files changed, 475 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 13f908a..2134b54 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,8 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-789: Improve shuffle URI. (jinho) + TAJO-769: A minor improvements for HCatalogStore (Fengdong Yu via hyunsik) TAJO-734: Arrange TajoCli output message. (hyoungjunkim via jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java index 383a787..dc9a63d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java @@ -27,8 +27,8 @@ import org.apache.tajo.common.ProtoObject; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.worker.FetchImpl; -import java.net.URI; import java.util.List; public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> { @@ -40,8 +40,8 @@ public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUn public String getSerializedData(); public boolean isInterQuery(); public void setInterQuery(); - public void addFetch(String name, URI uri); - public List<TajoWorkerProtocol.Fetch> getFetches(); + public void addFetch(String name, FetchImpl fetch); + public List<FetchImpl> getFetches(); public boolean shouldDie(); public void setShouldDie(); public QueryContext getQueryContext(); http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java index d4006e0..f1af2ff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java @@ -21,11 +21,11 @@ package org.apache.tajo.engine.query; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; -import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch; +import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto; import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder; +import org.apache.tajo.worker.FetchImpl; -import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -40,7 +40,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest { private boolean clusteredOutput; private String serializedData; // logical node private Boolean interQuery; - private List<Fetch> fetches; + private List<FetchImpl> fetches; private Boolean shouldDie; private QueryContext queryContext; private DataChannel dataChannel; @@ -177,16 +177,13 @@ public class QueryUnitRequestImpl implements QueryUnitRequest { maybeInitBuilder(); this.interQuery = true; } - - public void addFetch(String name, URI uri) { - maybeInitBuilder(); - initFetches(); - fetches.add( - Fetch.newBuilder() - .setName(name) - .setUrls(uri.toString()).build()); - - } + + public void addFetch(String name, FetchImpl fetch) { + maybeInitBuilder(); + initFetches(); + fetch.setName(name); + fetches.add(fetch); + } public QueryContext getQueryContext() { QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; @@ -236,7 +233,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest { return this.enforcer; } - public List<Fetch> getFetches() { + public List<FetchImpl> getFetches() { initFetches(); return this.fetches; @@ -247,9 +244,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest { return; } QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - this.fetches = new ArrayList<Fetch>(); - for(Fetch fetch : p.getFetchesList()) { - fetches.add(fetch); + this.fetches = new ArrayList<FetchImpl>(); + for(TajoWorkerProtocol.FetchProto fetch : p.getFetchesList()) { + fetches.add(new FetchImpl(fetch)); } } @@ -300,9 +297,11 @@ public class QueryUnitRequestImpl implements QueryUnitRequest { if (this.interQuery != null) { builder.setInterQuery(this.interQuery); } - if (this.fetches != null) { - builder.addAllFetches(this.fetches); - } + if (this.fetches != null) { + for (int i = 0; i < fetches.size(); i++) { + builder.addFetches(fetches.get(i).getProto()); + } + } if (this.shouldDie != null) { builder.setShouldDie(this.shouldDie); } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index 9978670..5bfac8b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -43,8 +43,8 @@ import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.worker.FetchImpl; -import java.net.URI; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.LinkedBlockingQueue; @@ -202,11 +202,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } else if (event instanceof FetchScheduleEvent) { FetchScheduleEvent castEvent = (FetchScheduleEvent) event; - Map<String, List<URI>> fetches = castEvent.getFetches(); + Map<String, List<FetchImpl>> fetches = castEvent.getFetches(); QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(); QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++); scheduledObjectNum++; - for (Entry<String, List<URI>> eachFetch : fetches.entrySet()) { + for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) { task.addFetches(eachFetch.getKey(), eachFetch.getValue()); task.addFragment(fragmentsForNonLeafTask[0], true); if (fragmentsForNonLeafTask[1] != null) { @@ -874,9 +874,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { taskAssign.setInterQuery(); } for (ScanNode scan : task.getScanNodes()) { - Collection<URI> fetches = task.getFetch(scan); + Collection<FetchImpl> fetches = task.getFetch(scan); if (fetches != null) { - for (URI fetch : fetches) { + for (FetchImpl fetch : fetches) { taskAssign.addFetch(scan.getTableName(), fetch); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java index 561f980..21e376c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java @@ -20,21 +20,21 @@ package org.apache.tajo.master; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.master.event.TaskSchedulerEvent; +import org.apache.tajo.worker.FetchImpl; -import java.net.URI; import java.util.List; import java.util.Map; public class FetchScheduleEvent extends TaskSchedulerEvent { - private final Map<String, List<URI>> fetches; + private final Map<String, List<FetchImpl>> fetches; public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId, - final Map<String, List<URI>> fetches) { + final Map<String, List<FetchImpl>> fetches) { super(eventType, blockId); this.fetches = fetches; } - public Map<String, List<URI>> getFetches() { + public Map<String, List<FetchImpl>> getFetches() { return fetches; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java index dd82f28..6552998 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java @@ -40,9 +40,9 @@ import org.apache.tajo.master.querymaster.QueryUnitAttempt; import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.worker.FetchImpl; import java.io.IOException; -import java.net.URI; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; @@ -484,11 +484,11 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { } if (!context.isLeafQuery()) { - Map<String, List<URI>> fetch = scheduledFetches.getNextFetch(); + Map<String, List<FetchImpl>> fetch = scheduledFetches.getNextFetch(); scheduledFetches.popNextFetch(); - for (Entry<String, List<URI>> fetchEntry : fetch.entrySet()) { - for (URI eachValue : fetchEntry.getValue()) { + for (Entry<String, List<FetchImpl>> fetchEntry : fetch.entrySet()) { + for (FetchImpl eachValue : fetchEntry.getValue()) { taskAssign.addFetch(fetchEntry.getKey(), eachValue); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java b/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java index 9b7dc22..b05572b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java @@ -18,15 +18,16 @@ package org.apache.tajo.master; -import java.net.URI; +import org.apache.tajo.worker.FetchImpl; + import java.util.ArrayList; import java.util.List; import java.util.Map; public class ScheduledFetches { - private List<Map<String, List<URI>>> fetches = new ArrayList<Map<String, List<URI>>>(); + private List<Map<String, List<FetchImpl>>> fetches = new ArrayList<Map<String, List<FetchImpl>>>(); - public void addFetch(Map<String, List<URI>> fetch) { + public void addFetch(Map<String, List<FetchImpl>> fetch) { this.fetches.add(fetch); } @@ -34,11 +35,11 @@ public class ScheduledFetches { return fetches.size() > 0; } - public Map<String, List<URI>> getNextFetch() { + public Map<String, List<FetchImpl>> getNextFetch() { return hasNextFetch() ? fetches.get(0) : null; } - public Map<String, List<URI>> popNextFetch() { + public Map<String, List<FetchImpl>> popNextFetch() { return hasNextFetch() ? fetches.remove(0) : null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index 34686da..27625b4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -40,6 +40,7 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.worker.FetchImpl; import java.net.URI; import java.util.*; @@ -63,7 +64,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { private List<ScanNode> scan; private Map<String, Set<FragmentProto>> fragMap; - private Map<String, Set<URI>> fetchMap; + private Map<String, Set<FetchImpl>> fetchMap; private int totalFragmentNum; @@ -269,18 +270,18 @@ public class QueryUnit implements EventHandler<TaskEvent> { return succeededHost; } - public void addFetches(String tableId, Collection<URI> urilist) { - Set<URI> uris; + public void addFetches(String tableId, Collection<FetchImpl> fetches) { + Set<FetchImpl> fetchSet; if (fetchMap.containsKey(tableId)) { - uris = fetchMap.get(tableId); + fetchSet = fetchMap.get(tableId); } else { - uris = Sets.newHashSet(); + fetchSet = Sets.newHashSet(); } - uris.addAll(urilist); - fetchMap.put(tableId, uris); + fetchSet.addAll(fetches); + fetchMap.put(tableId, fetchSet); } - public void setFetches(Map<String, Set<URI>> fetches) { + public void setFetches(Map<String, Set<FetchImpl>> fetches) { this.fetchMap.clear(); this.fetchMap.putAll(fetches); } @@ -301,19 +302,19 @@ public class QueryUnit implements EventHandler<TaskEvent> { return taskId; } - public Collection<URI> getFetchHosts(String tableId) { + public Collection<FetchImpl> getFetchHosts(String tableId) { return fetchMap.get(tableId); } - public Collection<Set<URI>> getFetches() { + public Collection<Set<FetchImpl>> getFetches() { return fetchMap.values(); } - public Map<String, Set<URI>> getFetchMap() { + public Map<String, Set<FetchImpl>> getFetchMap() { return fetchMap; } - public Collection<URI> getFetch(ScanNode scan) { + public Collection<FetchImpl> getFetch(ScanNode scan) { return this.fetchMap.get(scan.getTableName()); } @@ -323,21 +324,24 @@ public class QueryUnit implements EventHandler<TaskEvent> { @Override public String toString() { - String str = new String(plan.getType() + " \n"); + StringBuilder builder = new StringBuilder(); + builder.append(plan.getType() + " \n"); for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) { - str += e.getKey() + " : "; + builder.append(e.getKey()).append(" : "); for (FragmentProto fragment : e.getValue()) { - str += fragment + ", "; + builder.append(fragment).append(", "); } } - for (Entry<String, Set<URI>> e : fetchMap.entrySet()) { - str += e.getKey() + " : "; - for (URI t : e.getValue()) { - str += t + " "; + for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) { + builder.append(e.getKey()).append(" : "); + for (FetchImpl t : e.getValue()) { + for (URI uri : t.getURIs()){ + builder.append(uri).append(" "); + } } } - return str; + return builder.toString(); } public void setStats(TableStats stats) { @@ -612,20 +616,52 @@ public class QueryUnit implements EventHandler<TaskEvent> { return this.intermediateData; } + public static class PullHost { + String host; + int port; + public PullHost(String pullServerAddr, int pullServerPort){ + this.host = pullServerAddr; + this.port = pullServerPort; + } + public String getHost() { + return host; + } + + public int getPort() { + return this.port; + } + + public String getPullAddress() { + return host + ":" + port; + } + + @Override + public int hashCode() { + return Objects.hashCode(host, port); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof PullHost) { + PullHost other = (PullHost) obj; + return host.equals(other.host) && port == other.port; + } + + return false; + } + } + public static class IntermediateEntry { int taskId; int attemptId; int partId; - String pullHost; - int port; + PullHost host; - public IntermediateEntry(int taskId, int attemptId, int partId, - String pullServerAddr, int pullServerPort) { + public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) { this.taskId = taskId; this.attemptId = attemptId; this.partId = partId; - this.pullHost = pullServerAddr; - this.port = pullServerPort; + this.host = host; } public int getTaskId() { @@ -640,22 +676,13 @@ public class QueryUnit implements EventHandler<TaskEvent> { return this.partId; } - public String getPullHost() { - return this.pullHost; - } - - public int getPullPort() { - return port; - } - - public String getPullAddress() { - return pullHost + ":" + port; + public PullHost getPullHost() { + return this.host; } @Override public int hashCode() { - return Objects.hashCode(taskId, attemptId, partId, pullHost, port); + return Objects.hashCode(taskId, partId, attemptId, host); } - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java index b69742c..c3aae67 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java @@ -33,6 +33,7 @@ import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; +import org.apache.tajo.master.querymaster.QueryUnit.PullHost; import java.util.ArrayList; import java.util.EnumSet; @@ -273,9 +274,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { if (report.getShuffleFileOutputsCount() > 0) { this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList()); + PullHost host = new PullHost(getHost(), getPullServerPort()); for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) { IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(), - getId().getId(), p.getPartId(), getHost(), getPullServerPort()); + getId().getId(), p.getPartId(), host); partitions.add(entry); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 31d433d..3a2e79f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -39,6 +39,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.exception.InternalException; +import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TaskSchedulerContext; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; import org.apache.tajo.storage.AbstractStorageManager; @@ -47,6 +48,7 @@ import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.worker.FetchImpl; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -55,7 +57,6 @@ import java.net.URI; import java.util.*; import java.util.Map.Entry; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType; import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE; import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE; @@ -66,7 +67,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE; public class Repartitioner { private static final Log LOG = LogFactory.getLog(Repartitioner.class); - private static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900; + private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900; private final static String UNKNOWN_HOST = "unknown"; public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery) @@ -282,23 +283,16 @@ public class Repartitioner { private static void addJoinShuffle(SubQuery subQuery, int partitionId, Map<String, List<IntermediateEntry>> grouppedPartitions) { - Map<String, List<URI>> fetches = new HashMap<String, List<URI>>(); + Map<String, List<FetchImpl>> fetches = new HashMap<String, List<FetchImpl>>(); for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) { - Map<String, List<IntermediateEntry>> requests; + Collection<FetchImpl> requests; if (grouppedPartitions.containsKey(execBlock.getId().toString())) { - requests = mergeHashShuffleRequest(grouppedPartitions.get(execBlock.getId().toString())); + requests = mergeShuffleRequest(execBlock.getId(), partitionId, HASH_SHUFFLE, + grouppedPartitions.get(execBlock.getId().toString())); } else { return; } - Set<URI> fetchURIs = TUtil.newHashSet(); - for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) { - Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(), - execBlock.getId(), - partitionId, HASH_SHUFFLE, - requestPerNode.getValue()); - fetchURIs.addAll(uris); - } - fetches.put(execBlock.getId().toString(), Lists.newArrayList(fetchURIs)); + fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests)); } SubQuery.scheduleFetches(subQuery, fetches); } @@ -309,17 +303,23 @@ public class Repartitioner { * * @return key: pullserver's address, value: a list of requests */ - private static Map<String, List<IntermediateEntry>> mergeHashShuffleRequest(List<IntermediateEntry> partitions) { - Map<String, List<IntermediateEntry>> mergedPartitions = new HashMap<String, List<IntermediateEntry>>(); + private static Collection<FetchImpl> mergeShuffleRequest(ExecutionBlockId ebid, int partitionId, + TajoWorkerProtocol.ShuffleType type, + List<IntermediateEntry> partitions) { + Map<QueryUnit.PullHost, FetchImpl> mergedPartitions = new HashMap<QueryUnit.PullHost, FetchImpl>(); + for (IntermediateEntry partition : partitions) { - if (mergedPartitions.containsKey(partition.getPullAddress())) { - mergedPartitions.get(partition.getPullAddress()).add(partition); + QueryUnit.PullHost host = partition.getPullHost(); + if (mergedPartitions.containsKey(host)) { + FetchImpl fetch = mergedPartitions.get(partition.getPullHost()); + fetch.addPart(partition.getTaskId(), partition.getAttemptId()); } else { - mergedPartitions.put(partition.getPullAddress(), TUtil.newList(partition)); + FetchImpl fetch = new FetchImpl(host, type, ebid, partitionId); + fetch.addPart(partition.getTaskId(), partition.getAttemptId()); + mergedPartitions.put(partition.getPullHost(), fetch); } } - - return mergedPartitions; + return mergedPartitions.values(); } public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext, @@ -388,38 +388,39 @@ public class Repartitioner { FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); SubQuery.scheduleFragment(subQuery, dummyFragment); - List<String> basicFetchURIs = new ArrayList<String>(); + List<FetchImpl> fetches = new ArrayList<FetchImpl>(); List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId()); for (ExecutionBlock childBlock : childBlocks) { SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId()); for (QueryUnit qu : childExecSM.getQueryUnits()) { for (IntermediateEntry p : qu.getIntermediateData()) { - String uri = createBasicFetchUri(p.getPullHost(), p.getPullPort(), childBlock.getId(), p.taskId, p.attemptId); - basicFetchURIs.add(uri); + FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0); + fetch.addPart(p.getTaskId(), p.getAttemptId()); + fetches.add(fetch); } } } boolean ascendingFirstKey = sortSpecs[0].isAscending(); - SortedMap<TupleRange, Collection<URI>> map; + SortedMap<TupleRange, Collection<FetchImpl>> map; if (ascendingFirstKey) { - map = new TreeMap<TupleRange, Collection<URI>>(); + map = new TreeMap<TupleRange, Collection<FetchImpl>>(); } else { - map = new TreeMap<TupleRange, Collection<URI>>(new TupleRange.DescendingTupleRangeComparator()); + map = new TreeMap<TupleRange, Collection<FetchImpl>>(new TupleRange.DescendingTupleRangeComparator()); } - Set<URI> uris; + Set<FetchImpl> fetchSet; try { RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema); for (int i = 0; i < ranges.length; i++) { - uris = new HashSet<URI>(); - for (String uri: basicFetchURIs) { + fetchSet = new HashSet<FetchImpl>(); + for (FetchImpl fetch: fetches) { String rangeParam = TupleUtil.rangeToQuery(ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0, encoder); - URI finalUri = URI.create(uri + "&" + rangeParam); - uris.add(finalUri); + fetch.setRangeParams(rangeParam); + fetchSet.add(fetch); } - map.put(ranges[i], uris); + map.put(ranges[i], fetchSet); } } catch (UnsupportedEncodingException e) { @@ -431,39 +432,24 @@ public class Repartitioner { schedulerContext.setEstimatedTaskNum(determinedTaskNum); } - public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<URI>> partitions, + public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<FetchImpl>> partitions, String tableName, int num) { int i; - Map<String, List<URI>>[] fetchesArray = new Map[num]; + Map<String, List<FetchImpl>>[] fetchesArray = new Map[num]; for (i = 0; i < num; i++) { - fetchesArray[i] = new HashMap<String, List<URI>>(); + fetchesArray[i] = new HashMap<String, List<FetchImpl>>(); } i = 0; - for (Entry<?, Collection<URI>> entry : partitions.entrySet()) { - Collection<URI> value = entry.getValue(); + for (Entry<?, Collection<FetchImpl>> entry : partitions.entrySet()) { + Collection<FetchImpl> value = entry.getValue(); TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value); if (i == num) i = 0; } - for (Map<String, List<URI>> eachFetches : fetchesArray) { + for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) { SubQuery.scheduleFetches(subQuery, eachFetches); } } - public static String createBasicFetchUri(String hostName, int port, - ExecutionBlockId childSid, - int taskId, int attemptId) { - String scheme = "http://"; - StringBuilder sb = new StringBuilder(scheme); - sb.append(hostName).append(":").append(port).append("/?") - .append("qid=").append(childSid.getQueryId().toString()) - .append("&sid=").append(childSid.getId()) - .append("&").append("ta=").append(taskId).append("_").append(attemptId) - .append("&").append("p=0") - .append("&").append("type=r"); - - return sb.toString(); - } - public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan, SubQuery subQuery, DataChannel channel, int maxNum) { @@ -483,8 +469,8 @@ public class Repartitioner { fragments.add(frag); SubQuery.scheduleFragments(subQuery, fragments); - Map<String, List<IntermediateEntry>> hashedByHost; - Map<Integer, Collection<URI>> finalFetchURI = new HashMap<Integer, Collection<URI>>(); + Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost; + Map<Integer, Collection<FetchImpl>> finalFetches = new HashMap<Integer, Collection<FetchImpl>>(); for (ExecutionBlock block : masterPlan.getChilds(execBlock)) { List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>(); @@ -496,14 +482,15 @@ public class Repartitioner { Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions); for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) { hashedByHost = hashByHost(interm.getValue()); - for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) { - Collection<URI> uris = createHashFetchURL(e.getKey(), block.getId(), - interm.getKey(), channel.getShuffleType(), e.getValue()); + for (Entry<QueryUnit.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) { - if (finalFetchURI.containsKey(interm.getKey())) { - finalFetchURI.get(interm.getKey()).addAll(uris); + FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(), + block.getId(), interm.getKey(), e.getValue()); + + if (finalFetches.containsKey(interm.getKey())) { + finalFetches.get(interm.getKey()).add(fetch); } else { - finalFetchURI.put(interm.getKey(), TUtil.newList(uris)); + finalFetches.put(interm.getKey(), TUtil.newList(fetch)); } } } @@ -511,8 +498,8 @@ public class Repartitioner { GroupbyNode groupby = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY); // get a proper number of tasks - int determinedTaskNum = Math.min(maxNum, finalFetchURI.size()); - LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetchURI.size()); + int determinedTaskNum = Math.min(maxNum, finalFetches.size()); + LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size()); if (groupby != null && groupby.getGroupingColumns().length == 0) { determinedTaskNum = 1; LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); @@ -521,61 +508,70 @@ public class Repartitioner { // set the proper number of tasks to the estimated task num schedulerContext.setEstimatedTaskNum(determinedTaskNum); // divide fetch uris into the the proper number of tasks in a round robin manner. - scheduleFetchesByRoundRobin(subQuery, finalFetchURI, scan.getTableName(), determinedTaskNum); + scheduleFetchesByRoundRobin(subQuery, finalFetches, scan.getTableName(), determinedTaskNum); LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum); } - public static Collection<URI> createHashFetchURL(String hostAndPort, ExecutionBlockId ebid, - int partitionId, ShuffleType type, List<IntermediateEntry> entries) { + + public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) { String scheme = "http://"; + StringBuilder urlPrefix = new StringBuilder(scheme); - urlPrefix.append(hostAndPort).append("/?") - .append("qid=").append(ebid.getQueryId().toString()) - .append("&sid=").append(ebid.getId()) - .append("&p=").append(partitionId) + urlPrefix.append(fetch.getPullHost().getHost()).append(":").append(fetch.getPullHost().getPort()).append("/?") + .append("qid=").append(fetch.getExecutionBlockId().getQueryId().toString()) + .append("&sid=").append(fetch.getExecutionBlockId().getId()) + .append("&p=").append(fetch.getPartitionId()) .append("&type="); - if (type == HASH_SHUFFLE) { + if (fetch.getType() == HASH_SHUFFLE) { urlPrefix.append("h"); - } else if (type == RANGE_SHUFFLE) { - urlPrefix.append("r"); + } else if (fetch.getType() == RANGE_SHUFFLE) { + urlPrefix.append("r").append("&").append(fetch.getRangeParams()); } - urlPrefix.append("&ta="); - - // If the get request is longer than 2000 characters, - // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long. - // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15 - // The below code transforms a long request to multiple requests. - List<String> taskIdsParams = new ArrayList<String>(); - boolean first = true; - StringBuilder taskIdListBuilder = new StringBuilder(); - for (IntermediateEntry entry: entries) { - StringBuilder taskAttemptId = new StringBuilder(); - - if (!first) { // when comma is added? - taskAttemptId.append(","); - } else { - first = false; - } - taskAttemptId.append(entry.getTaskId()).append("_"). - append(entry.getAttemptId()); - if (taskIdListBuilder.length() + taskAttemptId.length() - > HTTP_REQUEST_MAXIMUM_LENGTH) { + List<URI> fetchURLs = new ArrayList<URI>(); + if(includeParts){ + // If the get request is longer than 2000 characters, + // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long. + // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15 + // The below code transforms a long request to multiple requests. + List<String> taskIdsParams = new ArrayList<String>(); + StringBuilder taskIdListBuilder = new StringBuilder(); + List<Integer> taskIds = fetch.getTaskIds(); + List<Integer> attemptIds = fetch.getAttemptIds(); + boolean first = true; + + for (int i = 0; i < taskIds.size(); i++) { + StringBuilder taskAttemptId = new StringBuilder(); + + if (!first) { // when comma is added? + taskAttemptId.append(","); + } else { + first = false; + } + + int taskId = taskIds.get(i); + int attemptId = attemptIds.get(i); + taskAttemptId.append(taskId).append("_").append(attemptId); + + if (taskIdListBuilder.length() + taskAttemptId.length() + > HTTP_REQUEST_MAXIMUM_LENGTH) { + taskIdsParams.add(taskIdListBuilder.toString()); + taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId); + } else { + taskIdListBuilder.append(taskAttemptId); + } + } + // if the url params remain + if (taskIdListBuilder.length() > 0) { taskIdsParams.add(taskIdListBuilder.toString()); - taskIdListBuilder = new StringBuilder(entry.getTaskId() + "_" + entry.getAttemptId()); - } else { - taskIdListBuilder.append(taskAttemptId); } - } - - // if the url params remain - if (taskIdListBuilder.length() > 0) { - taskIdsParams.add(taskIdListBuilder.toString()); - } - Collection<URI> fetchURLs = new ArrayList<URI>(); - for (String param : taskIdsParams) { - fetchURLs.add(URI.create(urlPrefix + param)); + urlPrefix.append("&ta="); + for (String param : taskIdsParams) { + fetchURLs.add(URI.create(urlPrefix + param)); + } + } else { + fetchURLs.add(URI.create(urlPrefix.toString())); } return fetchURLs; @@ -594,16 +590,16 @@ public class Repartitioner { return hashed; } - public static Map<String, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) { - Map<String, List<IntermediateEntry>> hashed = new HashMap<String, List<IntermediateEntry>>(); + public static Map<QueryUnit.PullHost, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) { + Map<QueryUnit.PullHost, List<IntermediateEntry>> hashed = new HashMap<QueryUnit.PullHost, List<IntermediateEntry>>(); - String hostName; + QueryUnit.PullHost host; for (IntermediateEntry entry : entries) { - hostName = entry.getPullHost() + ":" + entry.getPullPort(); - if (hashed.containsKey(hostName)) { - hashed.get(hostName).add(entry); + host = entry.getPullHost(); + if (hashed.containsKey(host)) { + hashed.get(host).add(entry); } else { - hashed.put(hostName, TUtil.newList(entry)); + hashed.put(host, TUtil.newList(entry)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 31c0efa..921bb3a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -56,9 +56,9 @@ import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; import org.apache.tajo.storage.AbstractStorageManager; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.worker.FetchImpl; import java.io.IOException; -import java.net.URI; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; @@ -945,7 +945,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { subQuery.getId(), leftFragment, rightFragments)); } - public static void scheduleFetches(SubQuery subQuery, Map<String, List<URI>> fetches) { + public static void scheduleFetches(SubQuery subQuery, Map<String, List<FetchImpl>> fetches) { subQuery.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, subQuery.getId(), fetches)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java new file mode 100644 index 0000000..9d1f428 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.querymaster.QueryUnit; +import org.apache.tajo.master.querymaster.Repartitioner; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +/** + * <code>FetchImpl</code> information to indicate the locations of intermediate data. + */ +public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto> { + private TajoWorkerProtocol.FetchProto.Builder builder = null; + + private QueryUnit.PullHost host; // The pull server host information + private TajoWorkerProtocol.ShuffleType type; // hash or range partition method. + private ExecutionBlockId executionBlockId; // The executionBlock id + private int partitionId; // The hash partition id + private String name; // The intermediate source name + private String rangeParams; // optional, the http parameters of range partition. (e.g., start=xx&end=yy) + private boolean hasNext = false; // optional, if true, has more taskIds + + private List<Integer> taskIds; // repeated, the task ids + private List<Integer> attemptIds; // repeated, the attempt ids + + public FetchImpl() { + builder = TajoWorkerProtocol.FetchProto.newBuilder(); + taskIds = new ArrayList<Integer>(); + attemptIds = new ArrayList<Integer>(); + } + + public FetchImpl(TajoWorkerProtocol.FetchProto proto) { + this(new QueryUnit.PullHost(proto.getHost(), proto.getPort()), + proto.getType(), + new ExecutionBlockId(proto.getExecutionBlockId()), + proto.getPartitionId(), + proto.getRangeParams(), + proto.getHasNext(), + proto.getName(), + proto.getTaskIdList(), proto.getAttemptIdList()); + } + + public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, ExecutionBlockId executionBlockId, + int partitionId) { + this(host, type, executionBlockId, partitionId, null, false, null, + new ArrayList<Integer>(), new ArrayList<Integer>()); + } + + public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, ExecutionBlockId executionBlockId, + int partitionId, List<QueryUnit.IntermediateEntry> intermediateEntryList) { + this(host, type, executionBlockId, partitionId, null, false, null, + new ArrayList<Integer>(), new ArrayList<Integer>()); + for (QueryUnit.IntermediateEntry entry : intermediateEntryList){ + addPart(entry.getTaskId(), entry.getAttemptId()); + } + } + + public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, ExecutionBlockId executionBlockId, + int partitionId, String rangeParams, boolean hasNext, String name, + List<Integer> taskIds, List<Integer> attemptIds) { + this.host = host; + this.type = type; + this.executionBlockId = executionBlockId; + this.partitionId = partitionId; + this.rangeParams = rangeParams; + this.hasNext = hasNext; + this.name = name; + this.taskIds = taskIds; + this.attemptIds = attemptIds; + } + + @Override + public int hashCode() { + return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams, hasNext, taskIds, attemptIds); + } + + @Override + public TajoWorkerProtocol.FetchProto getProto() { + if (builder == null) { + builder = TajoWorkerProtocol.FetchProto.newBuilder(); + } + builder.setHost(host.getHost()); + builder.setPort(host.getPort()); + builder.setType(type); + builder.setExecutionBlockId(executionBlockId.getProto()); + builder.setPartitionId(partitionId); + builder.setHasNext(hasNext); + builder.setName(name); + if (rangeParams != null && !rangeParams.isEmpty()) { + builder.setRangeParams(rangeParams); + } + + Preconditions.checkArgument(taskIds.size() == attemptIds.size()); + builder.addAllTaskId(taskIds); + builder.addAllAttemptId(attemptIds); + return builder.build(); + } + + public void addPart(int taskId, int attemptId) { + this.taskIds.add(taskId); + this.attemptIds.add(attemptId); + } + + public QueryUnit.PullHost getPullHost() { + return this.host; + } + + public ExecutionBlockId getExecutionBlockId() { + return executionBlockId; + } + + public void setExecutionBlockId(ExecutionBlockId executionBlockId) { + this.executionBlockId = executionBlockId; + } + + public int getPartitionId() { + return partitionId; + } + + public void setPartitionId(int partitionId) { + this.partitionId = partitionId; + } + + public String getRangeParams() { + return rangeParams; + } + + public void setRangeParams(String rangeParams) { + this.rangeParams = rangeParams; + } + + public boolean hasNext() { + return hasNext; + } + + public void setHasNext(boolean hasNext) { + this.hasNext = hasNext; + } + + public TajoWorkerProtocol.ShuffleType getType() { + return type; + } + + public void setType(TajoWorkerProtocol.ShuffleType type) { + this.type = type; + } + + /** + * Get the pull server URIs. + */ + public List<URI> getURIs(){ + return Repartitioner.createFetchURL(this, true); + } + + /** + * Get the pull server URIs without repeated parameters. + */ + public List<URI> getSimpleURIs(){ + return Repartitioner.createFetchURL(this, false); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public List<Integer> getTaskIds() { + return taskIds; + } + + public List<Integer> getAttemptIds() { + return attemptIds; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index bb136f7..a4836e4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -181,7 +181,7 @@ public class Fetcher { sb.append(name).append(" = ").append(value); } if (this.length == -1 && name.equals("Content-Length")) { - this.length = Long.valueOf(value); + this.length = Long.parseLong(value); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index ef52fd0..5c252fd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos.TaskAttemptState; @@ -54,7 +53,6 @@ import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.util.ApplicationIdUtils; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import java.io.File; @@ -192,8 +190,8 @@ public class Task { LOG.info("* Fragments (num: " + request.getFragments().size() + ")"); LOG.info("* Fetches (total:" + request.getFetches().size() + ") :"); - for (Fetch f : request.getFetches()) { - LOG.info("Table Id: " + f.getName() + ", url: " + f.getUrls()); + for (FetchImpl f : request.getFetches()) { + LOG.info("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs()); } LOG.info("* Local task dir: " + taskDir); if(LOG.isDebugEnabled()) { @@ -622,7 +620,7 @@ public class Task { } private List<Fetcher> getFetchRunners(TaskAttemptContext ctx, - List<Fetch> fetches) throws IOException { + List<FetchImpl> fetches) throws IOException { if (fetches.size() > 0) { @@ -639,15 +637,17 @@ public class Task { int i = 0; File storeFile; List<Fetcher> runnerList = Lists.newArrayList(); - for (Fetch f : fetches) { - storeDir = new File(inputDir.toString(), f.getName()); - if (!storeDir.exists()) { - storeDir.mkdirs(); + for (FetchImpl f : fetches) { + for (URI uri : f.getURIs()) { + storeDir = new File(inputDir.toString(), f.getName()); + if (!storeDir.exists()) { + storeDir.mkdirs(); + } + storeFile = new File(storeDir, "in_" + i); + Fetcher fetcher = new Fetcher(uri, storeFile, channelFactory); + runnerList.add(fetcher); + i++; } - storeFile = new File(storeDir, "in_" + i); - Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile, channelFactory); - runnerList.add(fetcher); - i++; } ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); return runnerList; @@ -737,19 +737,6 @@ public class Task { } } } - - public static final String FILECACHE = "filecache"; - public static final String APPCACHE = "appcache"; - public static final String USERCACHE = "usercache"; - - String fileCache; - public String getFileCacheDir() { - fileCache = USERCACHE + "/" + "hyunsik" + "/" + APPCACHE + "/" + - ConverterUtils.toString(ApplicationIdUtils.queryIdToAppId(taskId.getQueryUnitId().getExecutionBlockId().getQueryId())) + - "/" + "output"; - return fileCache; - } - public static Path getTaskAttemptDir(QueryUnitAttemptId quid) { Path workDir = StorageUtil.concatPath( http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index 78da10f..5d4ae44 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -68,18 +68,30 @@ message QueryUnitRequestProto { required bool clusteredOutput = 4; required string serializedData = 5; optional bool interQuery = 6 [default = false]; - repeated Fetch fetches = 7; + repeated FetchProto fetches = 7; optional bool shouldDie = 8; optional KeyValueSetProto queryContext = 9; optional DataChannelProto dataChannel = 10; optional EnforcerProto enforcer = 11; } -message Fetch { - required string name = 1; - required string urls = 2; +message FetchProto { + required string host = 1; + required int32 port = 2; + required ShuffleType type = 3; + required ExecutionBlockIdProto executionBlockId = 4; + required int32 partitionId = 5; + required string name = 6; + optional string rangeParams = 7; + optional bool hasNext = 8 [default = false]; + + //repeated part + repeated int32 taskId = 9 [packed=true]; + repeated int32 attemptId = 10 [packed=true]; } + + message QueryUnitResponseProto { required string id = 1; required QueryState status = 2; http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/main/resources/webapps/worker/queryunit.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp index 3e8dfef..06dca00 100644 --- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp +++ b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp @@ -40,6 +40,7 @@ <%@ page import="java.util.Set" %> <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %> <%@ page import="org.apache.tajo.worker.TaskHistory" %> +<%@ page import="org.apache.tajo.worker.FetchImpl" %> <% String paramQueryId = request.getParameter("queryId"); @@ -108,11 +109,13 @@ String fetchInfo = ""; delim = ""; - for (Map.Entry<String, Set<URI>> e : queryUnit.getFetchMap().entrySet()) { + for (Map.Entry<String, Set<FetchImpl>> e : queryUnit.getFetchMap().entrySet()) { fetchInfo += delim + "<b>" + e.getKey() + "</b>"; delim = "<br/>"; - for (URI t : e.getValue()) { - fetchInfo += delim + t; + for (FetchImpl f : e.getValue()) { + for (URI uri : f.getSimpleURIs()){ + fetchInfo += delim + uri; + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8321d263/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java index 987dc2a..0ccaebe 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -23,13 +23,16 @@ import org.apache.tajo.QueryId; import org.apache.tajo.TestTajoIds; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.querymaster.QueryUnit; -import org.apache.tajo.master.querymaster.Repartitioner; import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.FetchImpl; import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.junit.Test; import java.net.URI; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import static junit.framework.Assert.assertEquals; @@ -44,12 +47,18 @@ public class TestRepartitioner { List<QueryUnit.IntermediateEntry> intermediateEntries = TUtil.newList(); for (int i = 0; i < 1000; i++) { - intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, partitionId, hostName, port)); + intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, partitionId, new QueryUnit.PullHost(hostName, port))); } - Collection<URI> uris = Repartitioner. - createHashFetchURL(hostName + ":" + port, sid, partitionId, - TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE, intermediateEntries); + FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE, + sid, partitionId, intermediateEntries); + + fetch.setName(sid.toString()); + + TajoWorkerProtocol.FetchProto proto = fetch.getProto(); + fetch = new FetchImpl(proto); + assertEquals(proto, fetch.getProto()); + List<URI> uris = fetch.getURIs(); List<String> taList = TUtil.newList(); for (URI uri : uris) {
