TAJO-1047: DefaultTaskScheduler:allocateRackTask is failed occasionally on JDK 1.7. (jinho)
Closes #144 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1b3d51e1 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1b3d51e1 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1b3d51e1 Branch: refs/heads/block_iteration Commit: 1b3d51e149039db86d8c65f1fe3f0c8953b6faf4 Parents: 621d914 Author: jhkim <[email protected]> Authored: Sat Sep 20 16:58:55 2014 +0900 Committer: jhkim <[email protected]> Committed: Sat Sep 20 16:58:55 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../tajo/master/DefaultTaskScheduler.java | 49 +++++++++++++------- 2 files changed, 35 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/1b3d51e1/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 2d10e9c..995cd27 100644 --- a/CHANGES +++ b/CHANGES @@ -143,6 +143,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-1047: DefaultTaskScheduler:allocateRackTask is failed occasionally + on JDK 1.7. (jinho) + TAJO-1056: Wrong resource release or wrong task scheduling. (jinho) TAJO-1050: RPC client does not retry during connecting. http://git-wip-us.apache.org/repos/asf/tajo/blob/1b3d51e1/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index c5cf430..62d4892 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -19,6 +19,7 @@ package org.apache.tajo.master; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -95,10 +96,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { synchronized (schedulingThread){ schedulingThread.wait(100); } + schedule(); } catch (InterruptedException e) { break; + } catch (Throwable e) { + LOG.fatal(e.getMessage(), e); + break; } - schedule(); } LOG.info("TaskScheduler schedulingThread stopped"); } @@ -452,7 +456,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { for (DataLocation location : queryUnitAttempt.getQueryUnit().getDataLocations()) { if (!this.getHost().equals(location.getHost())) { HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); - volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt); + if (volumeMapping != null) { + volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt); + } } } } @@ -589,11 +595,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { // if the task is not included in leafTasks and nonLeafTasks. private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>()); private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>()); - private Map<String, HostVolumeMapping> leafTaskHostMapping = new HashMap<String, HostVolumeMapping>(); - private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping = - new HashMap<String, HashSet<QueryUnitAttemptId>>(); + private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap(); + private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap(); - private void addLeafTask(QueryUnitAttemptScheduleEvent event) { + private synchronized void addLeafTask(QueryUnitAttemptScheduleEvent event) { QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt(); List<DataLocation> locations = queryUnitAttempt.getQueryUnit().getDataLocations(); @@ -646,8 +651,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode - while (hostVolumeMapping.getRemainingLocalTaskSize() > 0) { + for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) { QueryUnitAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId); + + if(attemptId == null) break; //find remaining local task if (leafTasks.contains(attemptId)) { leafTasks.remove(attemptId); @@ -663,22 +670,30 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private QueryUnitAttemptId allocateRackTask(String host) { - List<HostVolumeMapping> remainingTasks = new ArrayList<HostVolumeMapping>(leafTaskHostMapping.values()); + List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values()); String rack = RackResolver.resolve(host).getNetworkLocation(); QueryUnitAttemptId attemptId = null; if (remainingTasks.size() > 0) { - //find largest remaining task of other host in rack - Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() { - @Override - public int compare(HostVolumeMapping v1, HostVolumeMapping v2) { - // descending remaining tasks - return Integer.valueOf(v2.remainTasksNum.get()).compareTo(Integer.valueOf(v1.remainTasksNum.get())); - } - }); + synchronized (scheduledRequests) { + //find largest remaining task of other host in rack + Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() { + @Override + public int compare(HostVolumeMapping v1, HostVolumeMapping v2) { + // descending remaining tasks + if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) { + return 1; + } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) { + return 0; + } else { + return -1; + } + } + }); + } for (HostVolumeMapping tasks : remainingTasks) { - while (tasks.getRemainingLocalTaskSize() > 0){ + for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) { QueryUnitAttemptId tId = tasks.getQueryUnitAttemptIdByRack(rack); if (tId == null) break;
