Repository: hive Updated Branches: refs/heads/master c168af26d -> c5b4d66d5
http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java index 01dc2e1..83e5246 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java @@ -22,10 +22,10 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.api.EntityDescriptor; @@ -49,9 +49,9 @@ public class Converters { public static TaskSpec getTaskSpecfromProto(SignableVertexSpec vectorProto, int fragmentNum, int attemptNum, TezTaskAttemptID attemptId) { - VertexIdentifier vertexId = vectorProto.getVertexIdentifier(); TezTaskAttemptID taskAttemptID = attemptId != null ? attemptId - : createTaskAttemptId(vertexId, fragmentNum, attemptNum); + : createTaskAttemptId(vectorProto.getQueryIdentifier(), vectorProto.getVertexIndex(), + fragmentNum, attemptNum); ProcessorDescriptor processorDescriptor = null; if (vectorProto.hasProcessorDescriptor()) { @@ -90,16 +90,16 @@ public class Converters { } public static TezTaskAttemptID createTaskAttemptId( - VertexIdentifier vertexId, int fragmentNum, int attemptNum) { + QueryIdentifierProto queryIdProto, int vertexIndex, int fragmentNum, int attemptNum) { // Come ride the API roller-coaster! return TezTaskAttemptID.getInstance( TezTaskID.getInstance( TezVertexID.getInstance( TezDAGID.getInstance( ConverterUtils.toApplicationId( - vertexId.getApplicationIdString()), - vertexId.getDagId()), - vertexId.getVertexId()), + queryIdProto.getApplicationIdString()), + queryIdProto.getDagIndex()), + vertexIndex), fragmentNum), attemptNum); } @@ -116,23 +116,18 @@ public class Converters { ctx.getTaskIndex()), ctx.getTaskAttemptNumber()); } - public static VertexIdentifier createVertexIdentifier( - TezTaskAttemptID taId, int appAttemptId) { - VertexIdentifier.Builder idBuilder = VertexIdentifier.newBuilder(); - idBuilder.setApplicationIdString( - taId.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); - idBuilder.setAppAttemptNumber(appAttemptId); - idBuilder.setDagId(taId.getTaskID().getVertexID().getDAGId().getId()); - idBuilder.setVertexId(taId.getTaskID().getVertexID().getId()); - return idBuilder.build(); - } - public static SignableVertexSpec.Builder convertTaskSpecToProto(TaskSpec taskSpec, - int appAttemptId, String tokenIdentifier, String user) { + public static SignableVertexSpec.Builder constructSignableVertexSpec(TaskSpec taskSpec, + QueryIdentifierProto queryIdentifierProto, + String tokenIdentifier, + String user, + String hiveQueryIdString) { TezTaskAttemptID tId = taskSpec.getTaskAttemptID(); SignableVertexSpec.Builder builder = SignableVertexSpec.newBuilder(); - builder.setVertexIdentifier(createVertexIdentifier(tId, appAttemptId)); + builder.setQueryIdentifier(queryIdentifierProto); + builder.setHiveQueryId(hiveQueryIdString); + builder.setVertexIndex(tId.getTaskID().getVertexID().getId()); builder.setDagName(taskSpec.getDAGName()); builder.setVertexName(taskSpec.getVertexName()); builder.setVertexParallelism(taskSpec.getVertexParallelism()); http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-common/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto index 92dda21..2e74c18 100644 --- a/llap-common/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto @@ -46,32 +46,27 @@ message GroupInputSpecProto { optional EntityDescriptorProto merged_input_descriptor = 3; } -message VertexIdentifier { - optional string application_id_string = 1; - optional int32 app_attempt_number = 2; - optional int32 dag_id = 3; - optional int32 vertex_id = 4; -} - // The part of SubmitWork that can be signed message SignableVertexSpec { optional string user = 1; optional int64 signatureKeyId = 2; - optional VertexIdentifier vertexIdentifier = 3; + optional QueryIdentifierProto query_identifier = 3; + optional string hive_query_id = 4; // Display names cannot be modified by the client for now. If needed, they should be sent to HS2 who will put them here. - optional string dag_name = 4; - optional string vertex_name = 5; + optional string dag_name = 5; + optional string vertex_name = 6; + optional int32 vertex_index = 7; // The core vertex stuff - optional string token_identifier = 6; - optional EntityDescriptorProto processor_descriptor = 7; - repeated IOSpecProto input_specs = 8; - repeated IOSpecProto output_specs = 9; - repeated GroupInputSpecProto grouped_input_specs = 10; + optional string token_identifier = 8; + optional EntityDescriptorProto processor_descriptor = 9; + repeated IOSpecProto input_specs = 10; + repeated IOSpecProto output_specs = 11; + repeated GroupInputSpecProto grouped_input_specs = 12; - optional int32 vertex_parallelism = 11; // An internal field required for Tez. + optional int32 vertex_parallelism = 13; // An internal field required for Tez. } // Union @@ -95,8 +90,9 @@ enum SourceStateProto { } message QueryIdentifierProto { - optional string app_identifier = 1; - optional int32 dag_identifier = 2; + optional string application_id_string = 1; + optional int32 dag_index= 2; + optional int32 app_attempt_number = 3; } /** @@ -156,9 +152,8 @@ message SourceStateUpdatedResponseProto { } message QueryCompleteRequestProto { - optional string query_id = 1; - optional QueryIdentifierProto query_identifier = 2; - optional int64 delete_delay = 4 [default = 0]; + optional QueryIdentifierProto query_identifier = 1; + optional int64 delete_delay = 2 [default = 0]; } message QueryCompleteResponseProto { http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java ---------------------------------------------------------------------- diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java index 85c6091..e599711 100644 --- a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java +++ b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -77,12 +78,20 @@ public class TestConverters { new TaskSpec(tezTaskAttemptId, "dagName", "vertexName", 10, processorDescriptor, inputSpecList, outputSpecList, null); - SignableVertexSpec vertexProto = Converters.convertTaskSpecToProto(taskSpec, 0, "", "").build(); + QueryIdentifierProto queryIdentifierProto = + QueryIdentifierProto.newBuilder().setApplicationIdString(appId.toString()) + .setAppAttemptNumber(333).setDagIndex(300).build(); + + SignableVertexSpec vertexProto = Converters + .constructSignableVertexSpec(taskSpec, queryIdentifierProto, "", "", "hiveQueryId").build(); assertEquals("dagName", vertexProto.getDagName()); assertEquals("vertexName", vertexProto.getVertexName()); - assertEquals(appId.toString(), vertexProto.getVertexIdentifier().getApplicationIdString()); - assertEquals(tezDagId.getId(), vertexProto.getVertexIdentifier().getDagId()); + assertEquals("hiveQueryId", vertexProto.getHiveQueryId()); + assertEquals(appId.toString(), vertexProto.getQueryIdentifier().getApplicationIdString()); + assertEquals(tezDagId.getId(), vertexProto.getQueryIdentifier().getDagIndex()); + assertEquals(333, vertexProto.getQueryIdentifier().getAppAttemptNumber()); + assertEquals(tezVertexId.getId(), vertexProto.getVertexIndex()); assertEquals(processorDescriptor.getClassName(), vertexProto.getProcessorDescriptor().getClassName()); assertEquals(processorDescriptor.getUserPayload().getPayload(), @@ -116,8 +125,14 @@ public class TestConverters { TezTaskID tezTaskId = TezTaskID.getInstance(tezVertexId, 500); TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tezTaskId, 600); + QueryIdentifierProto queryIdentifierProto = + QueryIdentifierProto.newBuilder().setApplicationIdString(appId.toString()) + .setAppAttemptNumber(333).setDagIndex(tezDagId.getId()).build(); + SignableVertexSpec.Builder builder = SignableVertexSpec.newBuilder(); - builder.setVertexIdentifier(Converters.createVertexIdentifier(tezTaskAttemptId, 0)); + builder.setQueryIdentifier(queryIdentifierProto); + builder.setHiveQueryId("hiveQueryId"); + builder.setVertexIndex(tezVertexId.getId()); builder.setDagName("dagName"); builder.setVertexName("vertexName"); builder.setProcessorDescriptor( http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 46030ec..6d63797 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -154,8 +154,10 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> LOG.debug("Socket connected"); SignableVertexSpec vertex = SignableVertexSpec.parseFrom(submitWorkInfo.getVertexBinary()); - String fragmentId = Converters.createTaskAttemptId(vertex.getVertexIdentifier(), - request.getFragmentNumber(), request.getAttemptNumber()).toString(); + + String fragmentId = + Converters.createTaskAttemptId(vertex.getQueryIdentifier(), vertex.getVertexIndex(), + request.getFragmentNumber(), request.getAttemptNumber()).toString(); OutputStream socketStream = socket.getOutputStream(); LlapOutputSocketInitMessage.Builder builder = LlapOutputSocketInitMessage.newBuilder().setFragmentId(fragmentId); http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index ded84c1..2f9dea0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecPro import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.NotTezEvent; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; @@ -50,12 +51,10 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.security.LlapSignerImpl; import org.apache.hadoop.hive.llap.tez.Converters; -import org.apache.hadoop.hive.ql.exec.tez.TezProcessor; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -96,7 +95,6 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu private final Map<String, String> localEnv = new HashMap<>(); private final long memoryPerExecutor; private final LlapDaemonExecutorMetrics metrics; - private final Configuration conf; private final TaskRunnerCallable.ConfParams confParams; private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl(); private final HadoopShim tezHadoopShim; @@ -110,7 +108,6 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics, AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, UgiFactory fsUgiFactory) { super("ContainerRunnerImpl"); - this.conf = conf; Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); this.localAddress = localAddress; @@ -178,12 +175,13 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu if (LOG.isInfoEnabled()) { LOG.info("Queueing container for execution: " + stringifySubmitRequest(request, vertex)); } - VertexIdentifier vId = vertex.getVertexIdentifier(); - TezTaskAttemptID attemptId = Converters.createTaskAttemptId( - vId, request.getFragmentNumber(), request.getAttemptNumber()); + QueryIdentifierProto qIdProto = vertex.getQueryIdentifier(); + TezTaskAttemptID attemptId = + Converters.createTaskAttemptId(vertex.getQueryIdentifier(), vertex.getVertexIndex(), + request.getFragmentNumber(), request.getAttemptNumber()); String fragmentIdString = attemptId.toString(); - HistoryLogger.logFragmentStart(vId.getApplicationIdString(), request.getContainerIdString(), - localAddress.get().getHostName(), vertex.getDagName(), vId.getDagId(), + HistoryLogger.logFragmentStart(qIdProto.getApplicationIdString(), request.getContainerIdString(), + localAddress.get().getHostName(), vertex.getDagName(), qIdProto.getDagIndex(), vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); // This is the start of container-annotated logging. @@ -201,7 +199,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); QueryIdentifier queryIdentifier = new QueryIdentifier( - vId.getApplicationIdString(), dagIdentifier); + qIdProto.getApplicationIdString(), dagIdentifier); Credentials credentials = new Credentials(); DataInputBuffer dib = new DataInputBuffer(); @@ -212,7 +210,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); QueryFragmentInfo fragmentInfo = queryTracker.registerFragment( - queryIdentifier, vId.getApplicationIdString(), vertex.getDagName(), dagIdentifier, + queryIdentifier, qIdProto.getApplicationIdString(), + vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier, vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo); @@ -227,7 +226,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu Configuration callableConf = new Configuration(getConfig()); UserGroupInformation taskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, - new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, + new ExecutionContextImpl(localAddress.get().getHostName()), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi); submissionState = executorService.schedule(callable); @@ -307,29 +306,13 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu } } - private static class LlapExecutionContext extends ExecutionContextImpl - implements TezProcessor.Hook { - private final QueryTracker queryTracker; - public LlapExecutionContext(String hostname, QueryTracker queryTracker) { - super(hostname); - this.queryTracker = queryTracker; - } - - @Override - public void initializeHook(TezProcessor source) { - queryTracker.registerDagQueryId( - new QueryIdentifier(source.getContext().getApplicationId().toString(), - source.getContext().getDagIdentifier()), - HiveConf.getVar(source.getConf(), HiveConf.ConfVars.HIVEQUERYID));; - } - } - @Override public SourceStateUpdatedResponseProto sourceStateUpdated( SourceStateUpdatedRequestProto request) throws IOException { LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request)); - QueryIdentifier queryId = new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), - request.getQueryIdentifier().getDagIdentifier()); + QueryIdentifier queryId = + new QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(), + request.getQueryIdentifier().getDagIndex()); queryTracker.registerSourceStateChange(queryId, request.getSrcName(), request.getState()); return SourceStateUpdatedResponseProto.getDefaultInstance(); } @@ -338,8 +321,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu public QueryCompleteResponseProto queryComplete( QueryCompleteRequestProto request) throws IOException { QueryIdentifier queryIdentifier = - new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), - request.getQueryIdentifier().getDagIdentifier()); + new QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(), + request.getQueryIdentifier().getDagIndex()); LOG.info("Processing queryComplete notification for {}", queryIdentifier); List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete( queryIdentifier, request.getDeleteDelay(), false); @@ -369,8 +352,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { StringBuilder sb = new StringBuilder(); - QueryIdentifier queryIdentifier = new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), - request.getQueryIdentifier().getDagIdentifier()); + QueryIdentifier queryIdentifier = new QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(), + request.getQueryIdentifier().getDagIndex()); sb.append("queryIdentifier=").append(queryIdentifier) .append(", ").append("sourceName=").append(request.getSrcName()) .append(", ").append("state=").append(request.getState()); @@ -381,11 +364,12 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu SubmitWorkRequestProto request, SignableVertexSpec vertex) { StringBuilder sb = new StringBuilder(); sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort()); - sb.append(", taskInfo=").append(vertex.getVertexIdentifier()).append(" fragment ") + sb.append(", taskInfo=").append(" fragment ") .append(request.getFragmentNumber()).append(" attempt ").append(request.getAttemptNumber()); sb.append(", user=").append(vertex.getUser()); - sb.append(", appIdString=").append(vertex.getVertexIdentifier().getApplicationIdString()); - sb.append(", appAttemptNum=").append(vertex.getVertexIdentifier().getAppAttemptNumber()); + sb.append(", queryId=").append(vertex.getHiveQueryId()); + sb.append(", appIdString=").append(vertex.getQueryIdentifier().getApplicationIdString()); + sb.append(", appAttemptNum=").append(vertex.getQueryIdentifier().getAppAttemptNumber()); sb.append(", containerIdString=").append(request.getContainerIdString()); sb.append(", dagName=").append(vertex.getDagName()); sb.append(", vertexName=").append(vertex.getVertexName()); http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index a965872..5366b9f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -117,9 +117,12 @@ public class QueryTracker extends AbstractService { * Register a new fragment for a specific query */ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, - String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, + String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken, String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException { + // QueryIdentifier is enough to uniquely identify a fragment. At the moment, it works off of appId and dag index. + // At a later point this could be changed to the Hive query identifier. + // Sending both over RPC is unnecessary overhead. ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.readLock().lock(); try { @@ -157,6 +160,8 @@ public class QueryTracker extends AbstractService { queryInfo.getTokenAppId(), queryInfo.getQueryIdentifier()); } + queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, hiveQueryIdString); + if (LOG.isDebugEnabled()) { LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier); } @@ -288,11 +293,6 @@ public class QueryTracker extends AbstractService { return dagMap; } - public void registerDagQueryId(QueryIdentifier queryIdentifier, String hiveQueryIdString) { - if (hiveQueryIdString == null) return; - queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, hiveQueryIdString); - } - @Override public void serviceStart() { LOG.info(getName() + " started"); http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 6c853a6..143e755 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -237,7 +237,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { if (shouldRunTask) { taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(), taskSpec, - vertex.getVertexIdentifier().getAppAttemptNumber(), + vertex.getQueryIdentifier().getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, @@ -480,7 +480,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { } protected void logFragmentEnd(boolean success) { - HistoryLogger.logFragmentEnd(vertex.getVertexIdentifier().getApplicationIdString(), + HistoryLogger.logFragmentEnd(vertex.getQueryIdentifier().getApplicationIdString(), request.getContainerIdString(), executionContext.getHostName(), vertex.getDagName(), fragmentInfo.getQueryInfo().getDagIdentifier(), vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), taskRunnerCallable.threadName, @@ -504,7 +504,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { public static String getTaskIdentifierString( SubmitWorkRequestProto request, SignableVertexSpec vertex) { StringBuilder sb = new StringBuilder(); - sb.append("AppId=").append(vertex.getVertexIdentifier().getApplicationIdString()) + sb.append("AppId=").append(vertex.getQueryIdentifier().getApplicationIdString()) .append(", containerId=").append(request.getContainerIdString()) .append(", Dag=").append(vertex.getDagName()) .append(", Vertex=").append(vertex.getVertexName()) http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index fe2ced5..cc2ca25 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -26,16 +26,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; -import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; @@ -97,8 +95,6 @@ public class TaskExecutorTestHelpers { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); - TezTaskID tId = TezTaskID.getInstance(vId, 389); - TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber); return SubmitWorkRequestProto .newBuilder() .setAttemptNumber(0) @@ -109,7 +105,13 @@ public class TaskExecutorTestHelpers { .setDagName("MockDag") .setUser("MockUser") .setTokenIdentifier("MockToken_1") - .setVertexIdentifier(Converters.createVertexIdentifier(taId, 0)) + .setQueryIdentifier( + QueryIdentifierProto.newBuilder() + .setApplicationIdString(appId.toString()) + .setAppAttemptNumber(0) + .setDagIndex(dagId.getId()) + .build()) + .setVertexIndex(vId.getId()) .setVertexName("MockVertex") .setProcessorDescriptor( LlapDaemonProtocolProtos.EntityDescriptorProto.newBuilder() http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index ac48a3a..53c19b4 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -20,43 +20,23 @@ package org.apache.hadoop.hive.llap.daemon.impl.comparator; import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.mock; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; -import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue; -import org.apache.hadoop.hive.llap.daemon.impl.QueryFragmentInfo; import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; -import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; -import org.apache.hadoop.hive.llap.tez.Converters; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.hadoop.shim.DefaultHadoopShim; -import org.apache.tez.runtime.api.impl.ExecutionContextImpl; -import org.apache.tez.runtime.task.EndReason; -import org.apache.tez.runtime.task.TaskRunner2Result; -import org.junit.Before; import org.junit.Test; public class TestFirstInFirstOutComparator { - private static Configuration conf; - private static Credentials cred = new Credentials(); - - @Before - public void setup() { - conf = new Configuration(); - } private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime, int attemptStartTime) { @@ -80,7 +60,13 @@ public class TestFirstInFirstOutComparator { VertexOrBinary.newBuilder().setVertex( SignableVertexSpec .newBuilder() - .setVertexIdentifier(Converters.createVertexIdentifier(taId, 0)) + .setQueryIdentifier( + QueryIdentifierProto.newBuilder() + .setApplicationIdString(appId.toString()) + .setAppAttemptNumber(0) + .setDagIndex(dagId.getId()) + .build()) + .setVertexIndex(vId.getId()) .setDagName("MockDag") .setVertexName("MockVertex") .setUser("MockUser") http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index fcf3378..718f6fd 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -70,10 +70,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.common.TezUtils; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -82,7 +84,6 @@ import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.apache.tez.serviceplugins.api.ContainerEndReason; -import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; @@ -107,7 +108,6 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private long deleteDelayOnDagComplete; private final LlapTaskUmbilicalProtocol umbilical; private final Token<LlapTokenIdentifier> token; - private final int appAttemptId; private final String user; // These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats. @@ -118,6 +118,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private final LlapRegistryService serviceRegistry; private volatile QueryIdentifierProto currentQueryIdentifierProto; + private volatile String currentHiveQueryId; public LlapTaskCommunicator( TaskCommunicatorContext taskCommunicatorContext) { @@ -143,7 +144,6 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { // TODO Avoid reading this from the environment user = System.getenv(ApplicationConstants.Environment.USER.name()); - appAttemptId = taskCommunicatorContext.getApplicationAttemptId().getAttemptId(); credentialMap = new ConcurrentHashMap<>(); sourceStateTracker = new SourceStateTracker(getContext(), this); @@ -262,8 +262,18 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority); int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId(); - if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIdentifier())) { - resetCurrentDag(dagId); + if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIndex())) { + // TODO HiveQueryId extraction by parsing the Processor payload is ugly. This can be improved + // once TEZ-2672 is fixed. + String hiveQueryId; + try { + hiveQueryId = extractQueryId(taskSpec); + } catch (IOException e) { + throw new RuntimeException("Failed to extract query id from task spec: " + taskSpec, e); + } + Preconditions.checkNotNull(hiveQueryId, "Unexpected null query id"); + + resetCurrentDag(dagId, hiveQueryId); } @@ -292,7 +302,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { SubmitWorkRequestProto requestProto; try { - requestProto = constructSubmitWorkRequest(containerId, taskSpec, fragmentRuntimeInfo); + requestProto = constructSubmitWorkRequest(containerId, taskSpec, fragmentRuntimeInfo, currentHiveQueryId); } catch (IOException e) { throw new RuntimeException("Failed to construct request", e); } @@ -388,7 +398,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself if (nodeId != null) { TerminateFragmentRequestProto request = - TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifierProto) + TerminateFragmentRequestProto.newBuilder().setQueryIdentifier( + constructQueryIdentifierProto( + taskAttemptId.getTaskID().getVertexID().getDAGId().getId())) .setFragmentIdentifierString(taskAttemptId.toString()).build(); communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() { @@ -415,9 +427,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { @Override public void dagComplete(final int dagIdentifier) { + QueryIdentifierProto queryIdentifierProto = constructQueryIdentifierProto(dagIdentifier); QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder() - .setQueryIdentifier(constructQueryIdentifierProto(dagIdentifier)) - .setDeleteDelay(deleteDelayOnDagComplete).build(); + .setQueryIdentifier(queryIdentifierProto).setDeleteDelay(deleteDelayOnDagComplete).build(); for (final LlapNodeId llapNodeId : nodesForQuery) { LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, llapNodeId); communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(), @@ -597,19 +609,29 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } } - private void resetCurrentDag(int newDagId) { + private void resetCurrentDag(int newDagId, String hiveQueryId) { // Working on the assumption that a single DAG runs at a time per AM. + currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId); - sourceStateTracker.resetState(newDagId); + currentHiveQueryId = hiveQueryId; + sourceStateTracker.resetState(currentQueryIdentifierProto); nodesForQuery.clear(); - LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagInfo().getName()); + LOG.info("CurrentDagId set to: " + newDagId + ", name=" + + getContext().getCurrentDagInfo().getName() + ", queryId=" + hiveQueryId); // TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which // is likely already happening. } + private String extractQueryId(TaskSpec taskSpec) throws IOException { + UserPayload processorPayload = taskSpec.getProcessorDescriptor().getUserPayload(); + Configuration conf = TezUtils.createConfFromUserPayload(processorPayload); + return HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); + } + private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, TaskSpec taskSpec, - FragmentRuntimeInfo fragmentRuntimeInfo) throws + FragmentRuntimeInfo fragmentRuntimeInfo, + String hiveQueryId) throws IOException { SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder(); builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId()); @@ -618,7 +640,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { builder.setAmHost(getAddress().getHostName()); builder.setAmPort(getAddress().getPort()); - Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == + Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() == taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); if (credentialsBinary == null) { @@ -628,8 +650,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { credentialsBinary = credentialsBinary.duplicate(); } builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); - builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.convertTaskSpecToProto( - taskSpec, appAttemptId, getTokenIdentifier(), user)).build()); + builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.constructSignableVertexSpec( + taskSpec, currentQueryIdentifierProto, getTokenIdentifier(), user, hiveQueryId)).build()); // Don't call builder.setWorkSpecSignature() - Tez doesn't sign fragments builder.setFragmentRuntimeInfo(fragmentRuntimeInfo); return builder.build(); @@ -843,7 +865,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private QueryIdentifierProto constructQueryIdentifierProto(int dagIdentifier) { return QueryIdentifierProto.newBuilder() - .setAppIdentifier(getContext().getCurrentAppIdentifier()).setDagIdentifier(dagIdentifier) + .setApplicationIdString(getContext().getCurrentAppIdentifier()).setDagIndex(dagIdentifier) + .setAppAttemptNumber(getContext().getApplicationAttemptId().getAttemptId()) .build(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java index 3dd73f6..2a66e4d 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java @@ -44,8 +44,6 @@ public class SourceStateTracker { private final TaskCommunicatorContext taskCommunicatorContext; private final LlapTaskCommunicator taskCommunicator; - private final QueryIdentifierProto BASE_QUERY_IDENTIFIER; - // Tracks vertices for which notifications have been registered private final Set<String> notificationRegisteredVertices = new HashSet<>(); @@ -58,19 +56,16 @@ public class SourceStateTracker { LlapTaskCommunicator taskCommunicator) { this.taskCommunicatorContext = taskCommunicatorContext; this.taskCommunicator = taskCommunicator; - BASE_QUERY_IDENTIFIER = QueryIdentifierProto.newBuilder() - .setAppIdentifier(taskCommunicatorContext.getCurrentAppIdentifier()).build(); } /** * To be invoked after each DAG completes. */ - public synchronized void resetState(int newDagId) { + public synchronized void resetState(QueryIdentifierProto currentQueryIdentifierProto) { sourceInfoMap.clear(); nodeInfoMap.clear(); notificationRegisteredVertices.clear(); - this.currentQueryIdentifier = - QueryIdentifierProto.newBuilder(BASE_QUERY_IDENTIFIER).setDagIdentifier(newDagId).build(); + this.currentQueryIdentifier = currentQueryIdentifierProto; } /** http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java index 1901328..0f28f70 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; @@ -351,11 +353,21 @@ public class TestLlapTaskCommunicator { private TaskSpec createBaseTaskSpec(String vertexName, TezVertexID vertexId, int taskIdx) { TaskSpec taskSpec = mock(TaskSpec.class); + Configuration conf = new Configuration(false); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYID, "fakeQueryId"); + UserPayload userPayload; + try { + userPayload = TezUtils.createUserPayloadFromConf(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance( TezTaskID.getInstance(vertexId, taskIdx), 0); doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID(); doReturn(DAG_NAME).when(taskSpec).getDAGName(); doReturn(vertexName).when(taskSpec).getVertexName(); + ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("fakeClassName").setUserPayload(userPayload); + doReturn(processorDescriptor).when(taskSpec).getProcessorDescriptor(); return taskSpec; } } http://git-wip-us.apache.org/repos/asf/hive/blob/c5b4d66d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index bdf254b..0705e2e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.llap.Schema; import org.apache.hadoop.hive.llap.SubmitWorkInfo; import org.apache.hadoop.hive.llap.TypeDesc; import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.security.LlapSigner; import org.apache.hadoop.hive.llap.security.LlapSigner.Signable; @@ -370,8 +371,14 @@ public class GenericUDTFGetSplits extends GenericUDTF { // 2. Generate the vertex/submit information for all events. if (i == 0) { + // The queryId could either be picked up from the current request being processed, or + // generated. The current request isn't exactly correct since the query is 'done' once we + // return the results. Generating a new one has the added benefit of working once this + // is moved out of a UDTF into a proper API. + // Setting this to the generated AppId which is unique. // Despite the differences in TaskSpec, the vertex spec should be the same. - signedSvs = createSignedVertexSpec(signer, taskSpec, applicationId, queryUser); + signedSvs = createSignedVertexSpec(signer, taskSpec, applicationId, queryUser, + applicationId.toString()); } SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId, @@ -426,10 +433,12 @@ public class GenericUDTFGetSplits extends GenericUDTF { } private SignedMessage createSignedVertexSpec(LlapSigner signer, TaskSpec taskSpec, - ApplicationId applicationId, String queryUser) throws IOException { - - final SignableVertexSpec.Builder svsb = Converters.convertTaskSpecToProto( - taskSpec, 0, applicationId.toString(), queryUser); + ApplicationId applicationId, String queryUser, String queryIdString) throws IOException { + QueryIdentifierProto queryIdentifierProto = + QueryIdentifierProto.newBuilder().setApplicationIdString(applicationId.toString()) + .setDagIndex(taskSpec.getDagIdentifier()).setAppAttemptNumber(0).build(); + final SignableVertexSpec.Builder svsb = Converters.constructSignableVertexSpec( + taskSpec, queryIdentifierProto, applicationId.toString(), queryUser, queryIdString); if (signer == null) { SignedMessage result = new SignedMessage(); result.message = serializeVertexSpec(svsb);