Repository: hive
Updated Branches:
  refs/heads/llap e36f1fc4c -> d6b3a1cfd


HIVE-10475. LLAP: Minor fixes after tez api enhancements for dag completion. 
(Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d6b3a1cf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d6b3a1cf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d6b3a1cf

Branch: refs/heads/llap
Commit: d6b3a1cfd9653bc0127af246376c0bd6bd335858
Parents: e36f1fc
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Apr 23 17:02:05 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Apr 23 17:02:05 2015 -0700

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/LlapTaskReporter.java | 24 +++++++++++---------
 .../dag/app/rm/LlapTaskSchedulerService.java    |  2 +-
 .../hive/llap/daemon/MiniLlapCluster.java       |  5 ++++
 3 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d6b3a1cf/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index 1f25f76..ef596ac 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -231,7 +231,7 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
       eventsToSend.drainTo(events);
 
       if (!task.isTaskDone() && !task.hadFatalError()) {
-        TezCounters counters = null;
+        boolean sendCounters = false;
         /**
          * Increasing the heartbeat interval can delay the delivery of events. 
Sending just updated
          * records would save CPU in DAG AM, but certain counters are updated 
very frequently. Until
@@ -239,11 +239,10 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
          */
         // Not completely accurate, since OOB heartbeats could go out.
         if ((nonOobHeartbeatCounter.get() - prevCounterSendHeartbeatNum) * 
pollInterval >= sendCounterInterval) {
-          counters = task.getCounters();
+          sendCounters = true;
           prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get();
         }
-        updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, 
task.getProgress()),
-            updateEventMetadata);
+        updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), 
updateEventMetadata);
         events.add(updateEvent);
       }
 
@@ -321,13 +320,17 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
      *           indicates an exception somewhere in the AM.
      */
     private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws 
IOException, TezException {
-      TezEvent statusUpdateEvent = new TezEvent(new 
TaskStatusUpdateEvent(task.getCounters(),
-          task.getProgress()), updateEventMetadata);
+      TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), 
updateEventMetadata);
       TezEvent taskCompletedEvent = new TezEvent(new 
TaskAttemptCompletedEvent(),
           updateEventMetadata);
       return !heartbeat(Lists.newArrayList(statusUpdateEvent, 
taskCompletedEvent)).shouldDie;
     }
 
+    private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) {
+      return new TaskStatusUpdateEvent((sendCounters ? task.getCounters() : 
null),
+          task.getProgress(), task.getTaskStatistics());
+    }
+
     /**
      * Sends out final events for task failure.
      * @param taskAttemptID
@@ -342,8 +345,7 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
      */
     private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, 
String diagnostics,
                                EventMetaData srcMeta) throws IOException, 
TezException {
-      TezEvent statusUpdateEvent = new TezEvent(new 
TaskStatusUpdateEvent(task.getCounters(),
-          task.getProgress()), updateEventMetadata);
+      TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), 
updateEventMetadata);
       if (diagnostics == null) {
         diagnostics = ExceptionUtils.getStackTrace(t);
       } else {
@@ -382,18 +384,18 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
     }
   }
 
-  public boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws 
IOException, TezException {
+  public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) 
throws IOException, TezException {
     return currentCallable.taskSucceeded(taskAttemptID);
   }
 
   @Override
-  public boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, 
String diagnostics,
+  public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, 
Throwable t, String diagnostics,
                             EventMetaData srcMeta) throws IOException, 
TezException {
     return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
   }
 
   @Override
-  public void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> 
events) {
+  public synchronized void addEvents(TezTaskAttemptID taskAttemptID, 
Collection<TezEvent> events) {
     currentCallable.addEvents(taskAttemptID, events);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/d6b3a1cf/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index 3a827c3..8100ece 100644
--- 
a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ 
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -312,7 +312,7 @@ public class LlapTaskSchedulerService extends 
TaskSchedulerService {
   }
 
   @Override
-  public void resetMatchLocalityForAllHeldContainers() {
+  public void dagComplete() {
     // This is effectively DAG completed, and can be used to reset statistics 
being tracked.
     LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + 
dagStats);
     dagCounter.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/hive/blob/d6b3a1cf/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index 9fd4ba6..ac8dcba 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
 public class MiniLlapCluster extends AbstractService {
   private static final Log LOG = LogFactory.getLog(MiniLlapCluster.class);
@@ -138,6 +139,10 @@ public class MiniLlapCluster extends AbstractService {
         numExecutorsPerService);
     clusterSpecificConfiguration.setLong(
         LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, 
execBytesPerService);
+    // Optimize local fetch does not work with LLAP due to different local 
directories
+    // used by containers and LLAP
+    clusterSpecificConfiguration
+        .setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, 
false);
   }
 
   @Override

Reply via email to