Repository: hive
Updated Branches:
  refs/heads/master c168af26d -> c5b4d66d5


http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
index 01dc2e1..83e5246 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
@@ -22,10 +22,10 @@ import com.google.protobuf.ByteString;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.EntityDescriptor;
@@ -49,9 +49,9 @@ public class Converters {
 
   public static TaskSpec getTaskSpecfromProto(SignableVertexSpec vectorProto,
       int fragmentNum, int attemptNum, TezTaskAttemptID attemptId) {
-    VertexIdentifier vertexId = vectorProto.getVertexIdentifier();
     TezTaskAttemptID taskAttemptID = attemptId != null ? attemptId
-        : createTaskAttemptId(vertexId, fragmentNum, attemptNum);
+        : createTaskAttemptId(vectorProto.getQueryIdentifier(), 
vectorProto.getVertexIndex(),
+        fragmentNum, attemptNum);
 
     ProcessorDescriptor processorDescriptor = null;
     if (vectorProto.hasProcessorDescriptor()) {
@@ -90,16 +90,16 @@ public class Converters {
   }
 
   public static TezTaskAttemptID createTaskAttemptId(
-      VertexIdentifier vertexId, int fragmentNum, int attemptNum) {
+      QueryIdentifierProto queryIdProto, int vertexIndex, int fragmentNum, int 
attemptNum) {
     // Come ride the API roller-coaster!
     return TezTaskAttemptID.getInstance(
             TezTaskID.getInstance(
                 TezVertexID.getInstance(
                     TezDAGID.getInstance(
                         ConverterUtils.toApplicationId(
-                            vertexId.getApplicationIdString()),
-                        vertexId.getDagId()),
-                    vertexId.getVertexId()),
+                            queryIdProto.getApplicationIdString()),
+                        queryIdProto.getDagIndex()),
+                    vertexIndex),
                 fragmentNum),
             attemptNum);
   }
@@ -116,23 +116,18 @@ public class Converters {
             ctx.getTaskIndex()),
         ctx.getTaskAttemptNumber());
   }
-  public static VertexIdentifier createVertexIdentifier(
-      TezTaskAttemptID taId, int appAttemptId) {
-    VertexIdentifier.Builder idBuilder = VertexIdentifier.newBuilder();
-    idBuilder.setApplicationIdString(
-        
taId.getTaskID().getVertexID().getDAGId().getApplicationId().toString());
-    idBuilder.setAppAttemptNumber(appAttemptId);
-    idBuilder.setDagId(taId.getTaskID().getVertexID().getDAGId().getId());
-    idBuilder.setVertexId(taId.getTaskID().getVertexID().getId());
-    return idBuilder.build();
-  }
 
-  public static SignableVertexSpec.Builder convertTaskSpecToProto(TaskSpec 
taskSpec,
-      int appAttemptId, String tokenIdentifier, String user) {
+  public static SignableVertexSpec.Builder 
constructSignableVertexSpec(TaskSpec taskSpec,
+                                                                       
QueryIdentifierProto queryIdentifierProto,
+                                                                       String 
tokenIdentifier,
+                                                                       String 
user,
+                                                                       String 
hiveQueryIdString) {
     TezTaskAttemptID tId = taskSpec.getTaskAttemptID();
 
     SignableVertexSpec.Builder builder = SignableVertexSpec.newBuilder();
-    builder.setVertexIdentifier(createVertexIdentifier(tId, appAttemptId));
+    builder.setQueryIdentifier(queryIdentifierProto);
+    builder.setHiveQueryId(hiveQueryIdString);
+    builder.setVertexIndex(tId.getTaskID().getVertexID().getId());
     builder.setDagName(taskSpec.getDAGName());
     builder.setVertexName(taskSpec.getVertexName());
     builder.setVertexParallelism(taskSpec.getVertexParallelism());

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto 
b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 92dda21..2e74c18 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -46,32 +46,27 @@ message GroupInputSpecProto {
   optional EntityDescriptorProto merged_input_descriptor = 3;
 }
 
-message VertexIdentifier {
-  optional string application_id_string = 1;
-  optional int32 app_attempt_number = 2;
-  optional int32 dag_id = 3;
-  optional int32 vertex_id = 4;
-}
-
 // The part of SubmitWork that can be signed 
 message SignableVertexSpec
 {
   optional string user = 1;
   optional int64 signatureKeyId = 2;
 
-  optional VertexIdentifier vertexIdentifier = 3;
+  optional QueryIdentifierProto query_identifier = 3;
+  optional string hive_query_id = 4;
   // Display names cannot be modified by the client for now. If needed, they 
should be sent to HS2 who will put them here.
-  optional string dag_name = 4;
-  optional string vertex_name = 5;
+  optional string dag_name = 5;
+  optional string vertex_name = 6;
+  optional int32 vertex_index = 7;
 
   // The core vertex stuff 
-  optional string token_identifier = 6;
-  optional EntityDescriptorProto processor_descriptor = 7;
-  repeated IOSpecProto input_specs = 8;
-  repeated IOSpecProto output_specs = 9;
-  repeated GroupInputSpecProto grouped_input_specs = 10;
+  optional string token_identifier = 8;
+  optional EntityDescriptorProto processor_descriptor = 9;
+  repeated IOSpecProto input_specs = 10;
+  repeated IOSpecProto output_specs = 11;
+  repeated GroupInputSpecProto grouped_input_specs = 12;
 
-  optional int32 vertex_parallelism = 11; // An internal field required for 
Tez.
+  optional int32 vertex_parallelism = 13; // An internal field required for 
Tez.
 }
 
 // Union
@@ -95,8 +90,9 @@ enum SourceStateProto {
 }
 
 message QueryIdentifierProto {
-  optional string app_identifier = 1;
-  optional int32 dag_identifier = 2;
+  optional string application_id_string = 1;
+  optional int32 dag_index= 2;
+  optional int32 app_attempt_number = 3;
 }
 
 /**
@@ -156,9 +152,8 @@ message SourceStateUpdatedResponseProto {
 }
 
 message QueryCompleteRequestProto {
-  optional string query_id = 1;
-  optional QueryIdentifierProto query_identifier = 2;
-  optional int64 delete_delay = 4 [default = 0];
+  optional QueryIdentifierProto query_identifier = 1;
+  optional int64 delete_delay = 2 [default = 0];
 }
 
 message QueryCompleteResponseProto {

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java 
b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
index 85c6091..e599711 100644
--- a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -77,12 +78,20 @@ public class TestConverters {
         new TaskSpec(tezTaskAttemptId, "dagName", "vertexName", 10, 
processorDescriptor,
             inputSpecList, outputSpecList, null);
 
-    SignableVertexSpec vertexProto = 
Converters.convertTaskSpecToProto(taskSpec, 0, "", "").build();
+    QueryIdentifierProto queryIdentifierProto =
+        
QueryIdentifierProto.newBuilder().setApplicationIdString(appId.toString())
+            .setAppAttemptNumber(333).setDagIndex(300).build();
+
+    SignableVertexSpec vertexProto = Converters
+        .constructSignableVertexSpec(taskSpec, queryIdentifierProto, "", "", 
"hiveQueryId").build();
 
     assertEquals("dagName", vertexProto.getDagName());
     assertEquals("vertexName", vertexProto.getVertexName());
-    assertEquals(appId.toString(), 
vertexProto.getVertexIdentifier().getApplicationIdString());
-    assertEquals(tezDagId.getId(), 
vertexProto.getVertexIdentifier().getDagId());
+    assertEquals("hiveQueryId", vertexProto.getHiveQueryId());
+    assertEquals(appId.toString(), 
vertexProto.getQueryIdentifier().getApplicationIdString());
+    assertEquals(tezDagId.getId(), 
vertexProto.getQueryIdentifier().getDagIndex());
+    assertEquals(333, vertexProto.getQueryIdentifier().getAppAttemptNumber());
+    assertEquals(tezVertexId.getId(), vertexProto.getVertexIndex());
     assertEquals(processorDescriptor.getClassName(),
         vertexProto.getProcessorDescriptor().getClassName());
     assertEquals(processorDescriptor.getUserPayload().getPayload(),
@@ -116,8 +125,14 @@ public class TestConverters {
     TezTaskID tezTaskId = TezTaskID.getInstance(tezVertexId, 500);
     TezTaskAttemptID tezTaskAttemptId = 
TezTaskAttemptID.getInstance(tezTaskId, 600);
 
+    QueryIdentifierProto queryIdentifierProto =
+        
QueryIdentifierProto.newBuilder().setApplicationIdString(appId.toString())
+            .setAppAttemptNumber(333).setDagIndex(tezDagId.getId()).build();
+
     SignableVertexSpec.Builder builder = SignableVertexSpec.newBuilder();
-    
builder.setVertexIdentifier(Converters.createVertexIdentifier(tezTaskAttemptId, 
0));
+    builder.setQueryIdentifier(queryIdentifierProto);
+    builder.setHiveQueryId("hiveQueryId");
+    builder.setVertexIndex(tezVertexId.getId());
     builder.setDagName("dagName");
     builder.setVertexName("vertexName");
     builder.setProcessorDescriptor(

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 46030ec..6d63797 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -154,8 +154,10 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
 
     LOG.debug("Socket connected");
     SignableVertexSpec vertex = 
SignableVertexSpec.parseFrom(submitWorkInfo.getVertexBinary());
-    String fragmentId = 
Converters.createTaskAttemptId(vertex.getVertexIdentifier(),
-        request.getFragmentNumber(), request.getAttemptNumber()).toString();
+
+    String fragmentId =
+        Converters.createTaskAttemptId(vertex.getQueryIdentifier(), 
vertex.getVertexIndex(),
+            request.getFragmentNumber(), 
request.getAttemptNumber()).toString();
     OutputStream socketStream = socket.getOutputStream();
     LlapOutputSocketInitMessage.Builder builder =
         LlapOutputSocketInitMessage.newBuilder().setFragmentId(fragmentId);

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index ded84c1..2f9dea0 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -42,6 +42,7 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecPro
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.NotTezEvent;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
@@ -50,12 +51,10 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
 import org.apache.hadoop.hive.llap.tez.Converters;
-import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -96,7 +95,6 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   private final Map<String, String> localEnv = new HashMap<>();
   private final long memoryPerExecutor;
   private final LlapDaemonExecutorMetrics metrics;
-  private final Configuration conf;
   private final TaskRunnerCallable.ConfParams confParams;
   private final KilledTaskHandler killedTaskHandler = new 
KilledTaskHandlerImpl();
   private final HadoopShim tezHadoopShim;
@@ -110,7 +108,6 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics,
       AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, 
UgiFactory fsUgiFactory) {
     super("ContainerRunnerImpl");
-    this.conf = conf;
     Preconditions.checkState(numExecutors > 0,
         "Invalid number of executors: " + numExecutors + ". Must be > 0");
     this.localAddress = localAddress;
@@ -178,12 +175,13 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
     if (LOG.isInfoEnabled()) {
       LOG.info("Queueing container for execution: " + 
stringifySubmitRequest(request, vertex));
     }
-    VertexIdentifier vId = vertex.getVertexIdentifier();
-    TezTaskAttemptID attemptId = Converters.createTaskAttemptId(
-        vId, request.getFragmentNumber(), request.getAttemptNumber());
+    QueryIdentifierProto qIdProto = vertex.getQueryIdentifier();
+    TezTaskAttemptID attemptId =
+        Converters.createTaskAttemptId(vertex.getQueryIdentifier(), 
vertex.getVertexIndex(),
+            request.getFragmentNumber(), request.getAttemptNumber());
     String fragmentIdString = attemptId.toString();
-    HistoryLogger.logFragmentStart(vId.getApplicationIdString(), 
request.getContainerIdString(),
-        localAddress.get().getHostName(), vertex.getDagName(), vId.getDagId(),
+    HistoryLogger.logFragmentStart(qIdProto.getApplicationIdString(), 
request.getContainerIdString(),
+        localAddress.get().getHostName(), vertex.getDagName(), 
qIdProto.getDagIndex(),
         vertex.getVertexName(), request.getFragmentNumber(), 
request.getAttemptNumber());
 
     // This is the start of container-annotated logging.
@@ -201,7 +199,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       int dagIdentifier = 
taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
 
       QueryIdentifier queryIdentifier = new QueryIdentifier(
-          vId.getApplicationIdString(), dagIdentifier);
+          qIdProto.getApplicationIdString(), dagIdentifier);
 
       Credentials credentials = new Credentials();
       DataInputBuffer dib = new DataInputBuffer();
@@ -212,7 +210,8 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       Token<JobTokenIdentifier> jobToken = 
TokenCache.getSessionToken(credentials);
 
       QueryFragmentInfo fragmentInfo = queryTracker.registerFragment(
-          queryIdentifier, vId.getApplicationIdString(), vertex.getDagName(), 
dagIdentifier,
+          queryIdentifier, qIdProto.getApplicationIdString(),
+          vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier,
           vertex.getVertexName(), request.getFragmentNumber(), 
request.getAttemptNumber(),
           vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo);
 
@@ -227,7 +226,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       Configuration callableConf = new Configuration(getConfig());
       UserGroupInformation taskUgi = fsUgiFactory == null ? null : 
fsUgiFactory.createUgi();
       TaskRunnerCallable callable = new TaskRunnerCallable(request, 
fragmentInfo, callableConf,
-          new LlapExecutionContext(localAddress.get().getHostName(), 
queryTracker), env,
+          new ExecutionContextImpl(localAddress.get().getHostName()), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, 
killedTaskHandler,
           this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi);
       submissionState = executorService.schedule(callable);
@@ -307,29 +306,13 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
     }
   }
 
-  private static class LlapExecutionContext extends ExecutionContextImpl
-      implements TezProcessor.Hook {
-    private final QueryTracker queryTracker;
-    public LlapExecutionContext(String hostname, QueryTracker queryTracker) {
-      super(hostname);
-      this.queryTracker = queryTracker;
-    }
-
-    @Override
-    public void initializeHook(TezProcessor source) {
-      queryTracker.registerDagQueryId(
-          new 
QueryIdentifier(source.getContext().getApplicationId().toString(),
-              source.getContext().getDagIdentifier()),
-          HiveConf.getVar(source.getConf(), HiveConf.ConfVars.HIVEQUERYID));;
-    }
-  }
-
   @Override
   public SourceStateUpdatedResponseProto sourceStateUpdated(
       SourceStateUpdatedRequestProto request) throws IOException {
     LOG.info("Processing state update: " + 
stringifySourceStateUpdateRequest(request));
-    QueryIdentifier queryId = new 
QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
-        request.getQueryIdentifier().getDagIdentifier());
+    QueryIdentifier queryId =
+        new 
QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(),
+            request.getQueryIdentifier().getDagIndex());
     queryTracker.registerSourceStateChange(queryId, request.getSrcName(), 
request.getState());
     return SourceStateUpdatedResponseProto.getDefaultInstance();
   }
@@ -338,8 +321,8 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   public QueryCompleteResponseProto queryComplete(
       QueryCompleteRequestProto request) throws IOException {
     QueryIdentifier queryIdentifier =
-        new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
-            request.getQueryIdentifier().getDagIdentifier());
+        new 
QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(),
+            request.getQueryIdentifier().getDagIndex());
     LOG.info("Processing queryComplete notification for {}", queryIdentifier);
     List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete(
         queryIdentifier, request.getDeleteDelay(), false);
@@ -369,8 +352,8 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
 
   private String 
stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) {
     StringBuilder sb = new StringBuilder();
-    QueryIdentifier queryIdentifier = new 
QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
-        request.getQueryIdentifier().getDagIdentifier());
+    QueryIdentifier queryIdentifier = new 
QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(),
+        request.getQueryIdentifier().getDagIndex());
     sb.append("queryIdentifier=").append(queryIdentifier)
         .append(", ").append("sourceName=").append(request.getSrcName())
         .append(", ").append("state=").append(request.getState());
@@ -381,11 +364,12 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       SubmitWorkRequestProto request, SignableVertexSpec vertex) {
     StringBuilder sb = new StringBuilder();
     
sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort());
-    sb.append(", taskInfo=").append(vertex.getVertexIdentifier()).append(" 
fragment ")
+    sb.append(", taskInfo=").append(" fragment ")
       .append(request.getFragmentNumber()).append(" attempt 
").append(request.getAttemptNumber());
     sb.append(", user=").append(vertex.getUser());
-    sb.append(", 
appIdString=").append(vertex.getVertexIdentifier().getApplicationIdString());
-    sb.append(", 
appAttemptNum=").append(vertex.getVertexIdentifier().getAppAttemptNumber());
+    sb.append(", queryId=").append(vertex.getHiveQueryId());
+    sb.append(", 
appIdString=").append(vertex.getQueryIdentifier().getApplicationIdString());
+    sb.append(", 
appAttemptNum=").append(vertex.getQueryIdentifier().getAppAttemptNumber());
     sb.append(", containerIdString=").append(request.getContainerIdString());
     sb.append(", dagName=").append(vertex.getDagName());
     sb.append(", vertexName=").append(vertex.getVertexName());

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index a965872..5366b9f 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -117,9 +117,12 @@ public class QueryTracker extends AbstractService {
    * Register a new fragment for a specific query
    */
   QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String 
appIdString,
-      String dagName, int dagIdentifier, String vertexName, int 
fragmentNumber, int attemptNumber,
+      String dagName, String hiveQueryIdString, int dagIdentifier, String 
vertexName, int fragmentNumber, int attemptNumber,
       String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> 
appToken,
       String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException {
+    // QueryIdentifier is enough to uniquely identify a fragment. At the 
moment, it works off of appId and dag index.
+    // At a later point this could be changed to the Hive query identifier.
+    // Sending both over RPC is unnecessary overhead.
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     dagLock.readLock().lock();
     try {
@@ -157,6 +160,8 @@ public class QueryTracker extends AbstractService {
             queryInfo.getTokenAppId(), queryInfo.getQueryIdentifier());
       }
 
+      queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, 
hiveQueryIdString);
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("Registering request for {} with the ShuffleHandler", 
queryIdentifier);
       }
@@ -288,11 +293,6 @@ public class QueryTracker extends AbstractService {
     return dagMap;
   }
 
-  public void registerDagQueryId(QueryIdentifier queryIdentifier, String 
hiveQueryIdString) {
-    if (hiveQueryIdString == null) return;
-    queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, 
hiveQueryIdString);
-  }
-
   @Override
   public void serviceStart() {
     LOG.info(getName() + " started");

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 6c853a6..143e755 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -237,7 +237,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
         if (shouldRunTask) {
           taskRunner = new TezTaskRunner2(conf, taskUgi, 
fragmentInfo.getLocalDirs(),
               taskSpec,
-              vertex.getVertexIdentifier().getAppAttemptNumber(),
+              vertex.getQueryIdentifier().getAppAttemptNumber(),
               serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, 
executor,
               objectRegistry,
               pid,
@@ -480,7 +480,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     }
 
     protected void logFragmentEnd(boolean success) {
-      
HistoryLogger.logFragmentEnd(vertex.getVertexIdentifier().getApplicationIdString(),
+      
HistoryLogger.logFragmentEnd(vertex.getQueryIdentifier().getApplicationIdString(),
           request.getContainerIdString(), executionContext.getHostName(), 
vertex.getDagName(),
           fragmentInfo.getQueryInfo().getDagIdentifier(), 
vertex.getVertexName(),
           request.getFragmentNumber(), request.getAttemptNumber(), 
taskRunnerCallable.threadName,
@@ -504,7 +504,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
   public static String getTaskIdentifierString(
       SubmitWorkRequestProto request, SignableVertexSpec vertex) {
     StringBuilder sb = new StringBuilder();
-    
sb.append("AppId=").append(vertex.getVertexIdentifier().getApplicationIdString())
+    
sb.append("AppId=").append(vertex.getQueryIdentifier().getApplicationIdString())
         .append(", containerId=").append(request.getContainerIdString())
         .append(", Dag=").append(vertex.getDagName())
         .append(", Vertex=").append(vertex.getVertexName())

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index fe2ced5..cc2ca25 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -26,16 +26,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
-import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
@@ -97,8 +95,6 @@ public class TaskExecutorTestHelpers {
     ApplicationId appId = ApplicationId.newInstance(9999, 72);
     TezDAGID dagId = TezDAGID.getInstance(appId, 1);
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
-    TezTaskID tId = TezTaskID.getInstance(vId, 389);
-    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
     return SubmitWorkRequestProto
         .newBuilder()
         .setAttemptNumber(0)
@@ -109,7 +105,13 @@ public class TaskExecutorTestHelpers {
                 .setDagName("MockDag")
                 .setUser("MockUser")
                 .setTokenIdentifier("MockToken_1")
-                .setVertexIdentifier(Converters.createVertexIdentifier(taId, 
0))
+                .setQueryIdentifier(
+                    QueryIdentifierProto.newBuilder()
+                        .setApplicationIdString(appId.toString())
+                        .setAppAttemptNumber(0)
+                        .setDagIndex(dagId.getId())
+                        .build())
+                .setVertexIndex(vId.getId())
                 .setVertexName("MockVertex")
                 .setProcessorDescriptor(
                     LlapDaemonProtocolProtos.EntityDescriptorProto.newBuilder()

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index ac48a3a..53c19b4 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -20,43 +20,23 @@ package org.apache.hadoop.hive.llap.daemon.impl.comparator;
 import static 
org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
-import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue;
-import org.apache.hadoop.hive.llap.daemon.impl.QueryFragmentInfo;
 import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
-import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
-import org.apache.hadoop.hive.llap.tez.Converters;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.hadoop.shim.DefaultHadoopShim;
-import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.task.EndReason;
-import org.apache.tez.runtime.task.TaskRunner2Result;
-import org.junit.Before;
 import org.junit.Test;
 
 public class TestFirstInFirstOutComparator {
-  private static Configuration conf;
-  private static Credentials cred = new Credentials();
-
-  @Before
-  public void setup() {
-    conf = new Configuration();
-  }
 
   private SubmitWorkRequestProto createRequest(int fragmentNumber, int 
numSelfAndUpstreamTasks, int dagStartTime,
                                                int attemptStartTime) {
@@ -80,7 +60,13 @@ public class TestFirstInFirstOutComparator {
             VertexOrBinary.newBuilder().setVertex(
             SignableVertexSpec
                 .newBuilder()
-                .setVertexIdentifier(Converters.createVertexIdentifier(taId, 
0))
+                .setQueryIdentifier(
+                    QueryIdentifierProto.newBuilder()
+                        .setApplicationIdString(appId.toString())
+                        .setAppAttemptNumber(0)
+                        .setDagIndex(dagId.getId())
+                        .build())
+                .setVertexIndex(vId.getId())
                 .setDagName("MockDag")
                 .setVertexName("MockVertex")
                 .setUser("MockUser")

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index fcf3378..718f6fd 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -70,10 +70,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -82,7 +84,6 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
@@ -107,7 +108,6 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   private long deleteDelayOnDagComplete;
   private final LlapTaskUmbilicalProtocol umbilical;
   private final Token<LlapTokenIdentifier> token;
-  private final int appAttemptId;
   private final String user;
 
   // These two structures track the list of known nodes, and the list of nodes 
which are sending in keep-alive heartbeats.
@@ -118,6 +118,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   private final LlapRegistryService serviceRegistry;
 
   private volatile QueryIdentifierProto currentQueryIdentifierProto;
+  private volatile String currentHiveQueryId;
 
   public LlapTaskCommunicator(
       TaskCommunicatorContext taskCommunicatorContext) {
@@ -143,7 +144,6 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 
     // TODO Avoid reading this from the environment
     user = System.getenv(ApplicationConstants.Environment.USER.name());
-    appAttemptId = 
taskCommunicatorContext.getApplicationAttemptId().getAttemptId();
 
     credentialMap = new ConcurrentHashMap<>();
     sourceStateTracker = new SourceStateTracker(getContext(), this);
@@ -262,8 +262,18 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     super.registerRunningTaskAttempt(containerId, taskSpec, 
additionalResources, credentials,
         credentialsChanged, priority);
     int dagId = 
taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
-    if (currentQueryIdentifierProto == null || (dagId != 
currentQueryIdentifierProto.getDagIdentifier())) {
-      resetCurrentDag(dagId);
+    if (currentQueryIdentifierProto == null || (dagId != 
currentQueryIdentifierProto.getDagIndex())) {
+      // TODO HiveQueryId extraction by parsing the Processor payload is ugly. 
This can be improved
+      // once TEZ-2672 is fixed.
+      String hiveQueryId;
+      try {
+        hiveQueryId = extractQueryId(taskSpec);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to extract query id from task spec: 
" + taskSpec, e);
+      }
+      Preconditions.checkNotNull(hiveQueryId, "Unexpected null query id");
+
+      resetCurrentDag(dagId, hiveQueryId);
     }
 
 
@@ -292,7 +302,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     SubmitWorkRequestProto requestProto;
 
     try {
-      requestProto = constructSubmitWorkRequest(containerId, taskSpec, 
fragmentRuntimeInfo);
+      requestProto = constructSubmitWorkRequest(containerId, taskSpec, 
fragmentRuntimeInfo, currentHiveQueryId);
     } catch (IOException e) {
       throw new RuntimeException("Failed to construct request", e);
     }
@@ -388,7 +398,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     // NodeId can be null if the task gets unregistered due to failure / being 
killed by the daemon itself
     if (nodeId != null) {
       TerminateFragmentRequestProto request =
-          
TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifierProto)
+          TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(
+              constructQueryIdentifierProto(
+                  taskAttemptId.getTaskID().getVertexID().getDAGId().getId()))
               .setFragmentIdentifierString(taskAttemptId.toString()).build();
       communicator.sendTerminateFragment(request, nodeId.getHostname(), 
nodeId.getPort(),
           new 
LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>()
 {
@@ -415,9 +427,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 
   @Override
   public void dagComplete(final int dagIdentifier) {
+    QueryIdentifierProto queryIdentifierProto = 
constructQueryIdentifierProto(dagIdentifier);
     QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder()
-        .setQueryIdentifier(constructQueryIdentifierProto(dagIdentifier))
-        .setDeleteDelay(deleteDelayOnDagComplete).build();
+        
.setQueryIdentifier(queryIdentifierProto).setDeleteDelay(deleteDelayOnDagComplete).build();
     for (final LlapNodeId llapNodeId : nodesForQuery) {
       LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, 
llapNodeId);
       communicator.sendQueryComplete(request, llapNodeId.getHostname(), 
llapNodeId.getPort(),
@@ -597,19 +609,29 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     }
   }
 
-  private void resetCurrentDag(int newDagId) {
+  private void resetCurrentDag(int newDagId, String hiveQueryId) {
     // Working on the assumption that a single DAG runs at a time per AM.
+
     currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId);
-    sourceStateTracker.resetState(newDagId);
+    currentHiveQueryId = hiveQueryId;
+    sourceStateTracker.resetState(currentQueryIdentifierProto);
     nodesForQuery.clear();
-    LOG.info("CurrentDagId set to: " + newDagId + ", name=" + 
getContext().getCurrentDagInfo().getName());
+    LOG.info("CurrentDagId set to: " + newDagId + ", name=" +
+        getContext().getCurrentDagInfo().getName() + ", queryId=" + 
hiveQueryId);
     // TODO Is it possible for heartbeats to come in from lost tasks - those 
should be told to die, which
     // is likely already happening.
   }
 
+  private String extractQueryId(TaskSpec taskSpec) throws IOException {
+    UserPayload processorPayload = 
taskSpec.getProcessorDescriptor().getUserPayload();
+    Configuration conf = TezUtils.createConfFromUserPayload(processorPayload);
+    return HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
+  }
+
   private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId 
containerId,
                                                             TaskSpec taskSpec,
-                                                            
FragmentRuntimeInfo fragmentRuntimeInfo) throws
+                                                            
FragmentRuntimeInfo fragmentRuntimeInfo,
+                                                            String 
hiveQueryId) throws
       IOException {
     SubmitWorkRequestProto.Builder builder = 
SubmitWorkRequestProto.newBuilder();
     builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
@@ -618,7 +640,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     builder.setAmHost(getAddress().getHostName());
     builder.setAmPort(getAddress().getPort());
 
-    Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+    Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() ==
         
taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
     ByteBuffer credentialsBinary = 
credentialMap.get(currentQueryIdentifierProto);
     if (credentialsBinary == null) {
@@ -628,8 +650,8 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
       credentialsBinary = credentialsBinary.duplicate();
     }
     builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
-    
builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.convertTaskSpecToProto(
-        taskSpec, appAttemptId, getTokenIdentifier(), user)).build());
+    
builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.constructSignableVertexSpec(
+        taskSpec, currentQueryIdentifierProto, getTokenIdentifier(), user, 
hiveQueryId)).build());
     // Don't call builder.setWorkSpecSignature() - Tez doesn't sign fragments
     builder.setFragmentRuntimeInfo(fragmentRuntimeInfo);
     return builder.build();
@@ -843,7 +865,8 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 
   private QueryIdentifierProto constructQueryIdentifierProto(int 
dagIdentifier) {
     return QueryIdentifierProto.newBuilder()
-        
.setAppIdentifier(getContext().getCurrentAppIdentifier()).setDagIdentifier(dagIdentifier)
+        
.setApplicationIdString(getContext().getCurrentAppIdentifier()).setDagIndex(dagIdentifier)
+        
.setAppAttemptNumber(getContext().getApplicationAttemptId().getAttemptId())
         .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
index 3dd73f6..2a66e4d 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
@@ -44,8 +44,6 @@ public class SourceStateTracker {
   private final TaskCommunicatorContext taskCommunicatorContext;
   private final LlapTaskCommunicator taskCommunicator;
 
-  private final QueryIdentifierProto BASE_QUERY_IDENTIFIER;
-
   // Tracks vertices for which notifications have been registered
   private final Set<String> notificationRegisteredVertices = new HashSet<>();
 
@@ -58,19 +56,16 @@ public class SourceStateTracker {
                             LlapTaskCommunicator taskCommunicator) {
     this.taskCommunicatorContext = taskCommunicatorContext;
     this.taskCommunicator = taskCommunicator;
-    BASE_QUERY_IDENTIFIER = QueryIdentifierProto.newBuilder()
-        
.setAppIdentifier(taskCommunicatorContext.getCurrentAppIdentifier()).build();
   }
 
   /**
    * To be invoked after each DAG completes.
    */
-  public synchronized void resetState(int newDagId) {
+  public synchronized void resetState(QueryIdentifierProto 
currentQueryIdentifierProto) {
     sourceInfoMap.clear();
     nodeInfoMap.clear();
     notificationRegisteredVertices.clear();
-    this.currentQueryIdentifier =
-        
QueryIdentifierProto.newBuilder(BASE_QUERY_IDENTIFIER).setDagIdentifier(newDagId).build();
+    this.currentQueryIdentifier = currentQueryIdentifierProto;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
index 1901328..0f28f70 100644
--- 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+++ 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
@@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
@@ -351,11 +353,21 @@ public class TestLlapTaskCommunicator {
 
     private TaskSpec createBaseTaskSpec(String vertexName, TezVertexID 
vertexId, int taskIdx) {
       TaskSpec taskSpec = mock(TaskSpec.class);
+      Configuration conf = new Configuration(false);
+      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYID, "fakeQueryId");
+      UserPayload userPayload;
+      try {
+        userPayload = TezUtils.createUserPayloadFromConf(conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
       TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(
           TezTaskID.getInstance(vertexId, taskIdx), 0);
       doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
       doReturn(DAG_NAME).when(taskSpec).getDAGName();
       doReturn(vertexName).when(taskSpec).getVertexName();
+      ProcessorDescriptor processorDescriptor = 
ProcessorDescriptor.create("fakeClassName").setUserPayload(userPayload);
+      doReturn(processorDescriptor).when(taskSpec).getProcessorDescriptor();
       return taskSpec;
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index bdf254b..0705e2e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.llap.Schema;
 import org.apache.hadoop.hive.llap.SubmitWorkInfo;
 import org.apache.hadoop.hive.llap.TypeDesc;
 import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import org.apache.hadoop.hive.llap.security.LlapSigner;
 import org.apache.hadoop.hive.llap.security.LlapSigner.Signable;
@@ -370,8 +371,14 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
         // 2. Generate the vertex/submit information for all events.
         if (i == 0) {
+          // The queryId could either be picked up from the current request 
being processed, or
+          // generated. The current request isn't exactly correct since the 
query is 'done' once we
+          // return the results. Generating a new one has the added benefit of 
working once this
+          // is moved out of a UDTF into a proper API.
+          // Setting this to the generated AppId which is unique.
           // Despite the differences in TaskSpec, the vertex spec should be 
the same.
-          signedSvs = createSignedVertexSpec(signer, taskSpec, applicationId, 
queryUser);
+          signedSvs = createSignedVertexSpec(signer, taskSpec, applicationId, 
queryUser,
+              applicationId.toString());
         }
 
         SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId,
@@ -426,10 +433,12 @@ public class GenericUDTFGetSplits extends GenericUDTF {
   }
 
   private SignedMessage createSignedVertexSpec(LlapSigner signer, TaskSpec 
taskSpec,
-      ApplicationId applicationId, String queryUser) throws IOException {
-
-    final SignableVertexSpec.Builder svsb = Converters.convertTaskSpecToProto(
-        taskSpec, 0, applicationId.toString(), queryUser);
+      ApplicationId applicationId, String queryUser, String queryIdString) 
throws IOException {
+    QueryIdentifierProto queryIdentifierProto =
+        
QueryIdentifierProto.newBuilder().setApplicationIdString(applicationId.toString())
+            
.setDagIndex(taskSpec.getDagIdentifier()).setAppAttemptNumber(0).build();
+    final SignableVertexSpec.Builder svsb = 
Converters.constructSignableVertexSpec(
+        taskSpec, queryIdentifierProto, applicationId.toString(), queryUser, 
queryIdString);
     if (signer == null) {
       SignedMessage result = new SignedMessage();
       result.message = serializeVertexSpec(svsb);

Reply via email to