HIVE-15722. LLAP: Avoid marking a query as complete if the AMReporter runs into 
an error. (Siddharth Seth, reviewed by Sergey Shelukhin, Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aabe83db
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aabe83db
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aabe83db

Branch: refs/heads/master
Commit: aabe83dbf303ac4891d41ced72916c72b46ae072
Parents: 888e5d7
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jan 27 11:52:52 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Jan 27 11:52:52 2017 -0800

----------------------------------------------------------------------
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 17 +++++-------
 .../hive/llap/daemon/impl/QueryTracker.java     | 29 ++++++++++++++++----
 2 files changed, 31 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/aabe83db/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6d7d4de..8c33fa2 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -55,7 +55,6 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.Terminate
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -183,14 +182,16 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
     SignableVertexSpec vertex = extractVertexSpec(request, tokenInfo);
     TezEvent initialEvent = extractInitialEvent(request, tokenInfo);
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Queueing container for execution: " + 
stringifySubmitRequest(request, vertex));
-    }
-    QueryIdentifierProto qIdProto = vertex.getQueryIdentifier();
     TezTaskAttemptID attemptId =
         Converters.createTaskAttemptId(vertex.getQueryIdentifier(), 
vertex.getVertexIndex(),
             request.getFragmentNumber(), request.getAttemptNumber());
     String fragmentIdString = attemptId.toString();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Queueing container for execution: fragemendId={}, {}",
+          fragmentIdString, stringifySubmitRequest(request, vertex));
+    }
+    QueryIdentifierProto qIdProto = vertex.getQueryIdentifier();
+
     HistoryLogger.logFragmentStart(qIdProto.getApplicationIdString(), 
request.getContainerIdString(),
         localAddress.get().getHostName(), vertex.getDagName(), 
qIdProto.getDagIndex(),
         vertex.getVertexName(), request.getFragmentNumber(), 
request.getAttemptNumber());
@@ -478,11 +479,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   public void queryFailed(QueryIdentifier queryIdentifier) {
     LOG.info("Processing query failed notification for {}", queryIdentifier);
     List<QueryFragmentInfo> knownFragments;
-    try {
-      knownFragments = queryTracker.queryComplete(queryIdentifier, -1, true);
-    } catch (IOException e) {
-      throw new RuntimeException(e); // Should never happen here, no 
permission check.
-    }
+    knownFragments = queryTracker.getRegisteredFragments(queryIdentifier);
     LOG.info("DBG: Pending fragment count for failed query {} = {}", 
queryIdentifier,
         knownFragments.size());
     for (QueryFragmentInfo fragmentInfo : knownFragments) {

http://git-wip-us.apache.org/repos/asf/hive/blob/aabe83db/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index a7d7981..9eaddd2 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
 import org.apache.hadoop.hive.llap.log.LogHelpers;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.MDC;
 import org.apache.logging.slf4j.Log4jMarker;
 import org.apache.tez.common.CallableWithNdc;
@@ -141,14 +142,17 @@ public class QueryTracker extends AbstractService {
       String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException {
 
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
+    // Note: This is a readLock to prevent a race with queryComplete. 
Operations
+    // and mutations within this lock need to be on concurrent structures.
     dagLock.readLock().lock();
     try {
       if (completedDagMap.contains(queryIdentifier)) {
         // Cleanup the dag lock here, since it may have been created after the 
query completed
         dagSpecificLocks.remove(queryIdentifier);
-        throw new RuntimeException(
-            "Dag " + dagName + " already complete. Rejecting fragment ["
-                + vertexName + ", " + fragmentNumber + ", " + attemptNumber + 
"]");
+        String message = "Dag " + dagName + " already complete. Rejecting 
fragment ["
+            + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]";
+        LOG.info(message);
+        throw new RuntimeException(message);
       }
       // TODO: for now, we get the secure username out of UGI... after 
signing, we can take it
       //       out of the request provided that it's signed.
@@ -211,6 +215,22 @@ public class QueryTracker extends AbstractService {
     }
   }
 
+  List<QueryFragmentInfo> getRegisteredFragments(QueryIdentifier 
queryIdentifier) {
+    ReadWriteLock dagLock = getDagLock(queryIdentifier);
+    dagLock.readLock().lock();
+    try {
+      QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
+      if (queryInfo == null) {
+        // Race with queryComplete
+        LOG.warn("Unknown query: Returning an empty list of fragments");
+        return Collections.emptyList();
+      }
+      return queryInfo.getRegisteredFragments();
+    } finally {
+      dagLock.readLock().unlock();
+    }
+  }
+
   /**
    * Register completion for a query
    * @param queryIdentifier
@@ -231,8 +251,7 @@ public class QueryTracker extends AbstractService {
           deleteDelay);
       queryInfoMap.remove(queryIdentifier);
       if (queryInfo == null) {
-        // One case where this happens is when a query is killed via an 
explicit signal, and then
-        // another message is received from teh AMHeartbeater.
+        // Should not happen.
         LOG.warn("Ignoring query complete for unknown dag: {}", 
queryIdentifier);
         return Collections.emptyList();
       }

Reply via email to