Repository: hive Updated Branches: refs/heads/llap e36f1fc4c -> d6b3a1cfd
HIVE-10475. LLAP: Minor fixes after tez api enhancements for dag completion. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d6b3a1cf Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d6b3a1cf Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d6b3a1cf Branch: refs/heads/llap Commit: d6b3a1cfd9653bc0127af246376c0bd6bd335858 Parents: e36f1fc Author: Siddharth Seth <ss...@apache.org> Authored: Thu Apr 23 17:02:05 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Thu Apr 23 17:02:05 2015 -0700 ---------------------------------------------------------------------- .../hive/llap/daemon/impl/LlapTaskReporter.java | 24 +++++++++++--------- .../dag/app/rm/LlapTaskSchedulerService.java | 2 +- .../hive/llap/daemon/MiniLlapCluster.java | 5 ++++ 3 files changed, 19 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d6b3a1cf/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 1f25f76..ef596ac 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -231,7 +231,7 @@ public class LlapTaskReporter implements TaskReporterInterface { eventsToSend.drainTo(events); if (!task.isTaskDone() && !task.hadFatalError()) { - TezCounters counters = null; + boolean sendCounters = false; /** * Increasing the heartbeat interval can delay the delivery of events. Sending just updated * records would save CPU in DAG AM, but certain counters are updated very frequently. Until @@ -239,11 +239,10 @@ public class LlapTaskReporter implements TaskReporterInterface { */ // Not completely accurate, since OOB heartbeats could go out. if ((nonOobHeartbeatCounter.get() - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) { - counters = task.getCounters(); + sendCounters = true; prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get(); } - updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()), - updateEventMetadata); + updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata); events.add(updateEvent); } @@ -321,13 +320,17 @@ public class LlapTaskReporter implements TaskReporterInterface { * indicates an exception somewhere in the AM. */ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { - TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(), - task.getProgress()), updateEventMetadata); + TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), updateEventMetadata); return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; } + private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) { + return new TaskStatusUpdateEvent((sendCounters ? task.getCounters() : null), + task.getProgress(), task.getTaskStatistics()); + } + /** * Sends out final events for task failure. * @param taskAttemptID @@ -342,8 +345,7 @@ public class LlapTaskReporter implements TaskReporterInterface { */ private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { - TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(), - task.getProgress()), updateEventMetadata); + TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata); if (diagnostics == null) { diagnostics = ExceptionUtils.getStackTrace(t); } else { @@ -382,18 +384,18 @@ public class LlapTaskReporter implements TaskReporterInterface { } } - public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { + public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { return currentCallable.taskSucceeded(taskAttemptID); } @Override - public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, + public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta); } @Override - public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) { + public synchronized void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) { currentCallable.addEvents(taskAttemptID, events); } http://git-wip-us.apache.org/repos/asf/hive/blob/d6b3a1cf/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 3a827c3..8100ece 100644 --- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -312,7 +312,7 @@ public class LlapTaskSchedulerService extends TaskSchedulerService { } @Override - public void resetMatchLocalityForAllHeldContainers() { + public void dagComplete() { // This is effectively DAG completed, and can be used to reset statistics being tracked. LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + dagStats); dagCounter.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/hive/blob/d6b3a1cf/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index 9fd4ba6..ac8dcba 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -30,6 +30,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; public class MiniLlapCluster extends AbstractService { private static final Log LOG = LogFactory.getLog(MiniLlapCluster.class); @@ -138,6 +139,10 @@ public class MiniLlapCluster extends AbstractService { numExecutorsPerService); clusterSpecificConfiguration.setLong( LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, execBytesPerService); + // Optimize local fetch does not work with LLAP due to different local directories + // used by containers and LLAP + clusterSpecificConfiguration + .setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); } @Override