Repository: hive
Updated Branches:
  refs/heads/llap 20ac70b15 -> dc7ceb4e2


HIVE-10663. LLAP: Update tez dependency. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dc7ceb4e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dc7ceb4e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dc7ceb4e

Branch: refs/heads/llap
Commit: dc7ceb4e2fa81592b35b3a67d4d2bbea22a14ea4
Parents: 20ac70b
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 8 18:46:09 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 18:46:09 2015 -0700

----------------------------------------------------------------------
 llap-client/pom.xml                             |  2 +-
 llap-server/pom.xml                             |  2 +-
 .../hive/llap/daemon/impl/LlapTaskReporter.java | 53 ++++++++++++++------
 pom.xml                                         |  2 +-
 4 files changed, 42 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/dc7ceb4e/llap-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-client/pom.xml b/llap-client/pom.xml
index 8a1a8bd..87f53b7 100644
--- a/llap-client/pom.xml
+++ b/llap-client/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive</artifactId>
-    <version>1.2.0-SNAPSHOT</version>
+    <version>1.3.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/dc7ceb4e/llap-server/pom.xml
----------------------------------------------------------------------
diff --git a/llap-server/pom.xml b/llap-server/pom.xml
index 9325bd9..4fcd705 100644
--- a/llap-server/pom.xml
+++ b/llap-server/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive</artifactId>
-    <version>1.2.0-SNAPSHOT</version>
+    <version>1.3.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/dc7ceb4e/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index f3771ea..716fb23 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -130,7 +131,7 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
     private static final float LOG_COUNTER_BACKOFF = 1.3f;
 
     private final RuntimeTask task;
-    private EventMetaData updateEventMetadata;
+    private final EventMetaData updateEventMetadata;
 
     private final LlapTaskUmbilicalProtocol umbilical;
 
@@ -141,6 +142,9 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
 
     private final AtomicLong requestCounter;
 
+    private final AtomicBoolean finalEventQueued = new AtomicBoolean(false);
+    private final AtomicBoolean askedToDie = new AtomicBoolean(false);
+
     private LinkedBlockingQueue<TezEvent> eventsToSend = new 
LinkedBlockingQueue<TezEvent>();
 
     private final ReentrantLock lock = new ReentrantLock();
@@ -204,6 +208,9 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
       }
       int pendingEventCount = eventsToSend.size();
       if (pendingEventCount > 0) {
+        // This is OK because the pending events will be sent via the 
succeeded/failed messages.
+        // TaskDone is set before taskSucceeded/taskFailed are sent out - 
which is what causes the
+        // thread to exit
         LOG.warn("Exiting TaskReporter thread with pending queue size=" + 
pendingEventCount);
       }
       return true;
@@ -245,8 +252,9 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
       }
 
       long requestId = requestCounter.incrementAndGet();
+      int fromEventId = task.getNextFromEventId();
       TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, 
containerIdStr,
-          task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
+          task.getTaskAttemptID(), fromEventId, maxEventsToGet);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending heartbeat to AM, request=" + request);
       }
@@ -260,6 +268,7 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
 
       if (response.shouldDie()) {
         LOG.info("Received should die response from AM");
+        askedToDie.set(true);
         return new ResponseWrapper(true, 1);
       }
       if (response.getLastRequestId() != requestId) {
@@ -276,10 +285,13 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
               + " heartbeat response, eventCount=" + 
response.getEvents().size());
         }
       } else {
+        task.setNextFromEventId(response.getNextFromEventId());
         if (response.getEvents() != null && !response.getEvents().isEmpty()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Routing events from heartbeat response to task" + ", 
currentTaskAttemptId="
-                + task.getTaskAttemptID() + ", eventCount=" + 
response.getEvents().size());
+                + task.getTaskAttemptID() + ", eventCount=" + 
response.getEvents().size()
+                + " fromEventId=" + fromEventId
+                + " nextFromEventId=" + response.getNextFromEventId());
           }
           // This should ideally happen in a separate thread
           numEventsReceived = response.getEvents().size();
@@ -318,10 +330,16 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
      *           indicates an exception somewhere in the AM.
      */
     private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws 
IOException, TezException {
-      TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), 
updateEventMetadata);
-      TezEvent taskCompletedEvent = new TezEvent(new 
TaskAttemptCompletedEvent(),
-          updateEventMetadata);
-      return !heartbeat(Lists.newArrayList(statusUpdateEvent, 
taskCompletedEvent)).shouldDie;
+      // Ensure only one final event is ever sent.
+      if (!finalEventQueued.getAndSet(true)) {
+        TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), 
updateEventMetadata);
+        TezEvent taskCompletedEvent = new TezEvent(new 
TaskAttemptCompletedEvent(),
+            updateEventMetadata);
+        return !heartbeat(Lists.newArrayList(statusUpdateEvent, 
taskCompletedEvent)).shouldDie;
+      } else {
+        LOG.warn("A final task state event has already been sent. Not sending 
again");
+        return askedToDie.get();
+      }
     }
 
     private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) {
@@ -353,15 +371,22 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
      */
     private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, 
String diagnostics,
                                EventMetaData srcMeta) throws IOException, 
TezException {
-      TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), 
updateEventMetadata);
-      if (diagnostics == null) {
-        diagnostics = ExceptionUtils.getStackTrace(t);
+      // Ensure only one final event is ever sent.
+      if (!finalEventQueued.getAndSet(true)) {
+        TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), 
updateEventMetadata);
+        if (diagnostics == null) {
+          diagnostics = ExceptionUtils.getStackTrace(t);
+        } else {
+          diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+        }
+        TezEvent taskAttemptFailedEvent = new TezEvent(new 
TaskAttemptFailedEvent(diagnostics),
+            srcMeta == null ? updateEventMetadata : srcMeta);
+        return !heartbeat(Lists.newArrayList(statusUpdateEvent, 
taskAttemptFailedEvent)).shouldDie;
       } else {
-        diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
+        LOG.warn("A final task state event has already been sent. Not sending 
again");
+        return askedToDie.get();
       }
-      TezEvent taskAttemptFailedEvent = new TezEvent(new 
TaskAttemptFailedEvent(diagnostics),
-          srcMeta == null ? updateEventMetadata : srcMeta);
-      return !heartbeat(Lists.newArrayList(statusUpdateEvent, 
taskAttemptFailedEvent)).shouldDie;
+
     }
 
     private void addEvents(TezTaskAttemptID taskAttemptID, 
Collection<TezEvent> events) {

http://git-wip-us.apache.org/repos/asf/hive/blob/dc7ceb4e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d4eb8e5..60a18c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,7 +156,7 @@
     <stax.version>1.0.1</stax.version>
     <slf4j.version>1.7.5</slf4j.version>
     <ST4.version>4.0.4</ST4.version>
-    <tez.version>0.7.0-TEZ-2003-SNAPSHOT</tez.version>
+    <tez.version>0.8.0-TEZ-2003-SNAPSHOT</tez.version>
     <super-csv.version>2.2.0</super-csv.version>
     <spark.version>1.3.0</spark.version>
     <scala.binary.version>2.10</scala.binary.version>

Reply via email to