HIVE-13666. LLAP Provide the log url for a task attempt to display on the UI. (Siddharth Seth, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0a5bc94c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0a5bc94c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0a5bc94c Branch: refs/heads/llap Commit: 0a5bc94c1cf65d66f793474a4818021f29e05c7e Parents: a16058e Author: Siddharth Seth <[email protected]> Authored: Tue May 3 20:58:17 2016 +0530 Committer: Siddharth Seth <[email protected]> Committed: Tue May 3 20:58:17 2016 +0530 ---------------------------------------------------------------------- .../llap/tezplugins/LlapTaskCommunicator.java | 47 ++++++++++++++++++++ 1 file changed, 47 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0a5bc94c/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index a4f5d4d..b4b041a 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; @@ -63,6 +65,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConfiguration; @@ -108,6 +111,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private final ConcurrentMap<LlapNodeId, Long> knownNodeMap = new ConcurrentHashMap<>(); private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new ConcurrentHashMap<>(); + private final LlapRegistryService serviceRegistry; + private volatile int currentDagId; private volatile QueryIdentifierProto currentQueryIdentifierProto; @@ -129,6 +134,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled()); + // Not closing this at the moment at shutdown, since this could be a shared instance. + serviceRegistry = LlapRegistryService.getClient(conf); + umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical()); SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder(); @@ -455,6 +463,45 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { }); } + // @Override - TODO Add the annotation after upgrading Hive to Tez 0.8.4 + public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { + return constructLogUrl(containerNodeId); + } + + // @Override - TODO Add the annotation after upgrading Hive to Tez 0.8.4 + public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { + return constructLogUrl(containerNodeId); + } + + private String constructLogUrl(NodeId containerNodeId) { + Set<ServiceInstance> instanceSet = null; + try { + instanceSet = serviceRegistry.getInstances().getByHost(containerNodeId.getHost()); + } catch (IOException e) { + // Not failing the job due to a failure constructing the log url + LOG.warn( + "Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}", + containerNodeId, e.getMessage()); + return null; + } + if (instanceSet != null) { + ServiceInstance matchedInstance = null; + for (ServiceInstance instance : instanceSet) { + if (instance.getRpcPort() == containerNodeId.getPort()) { + matchedInstance = instance; + break; + } + } + if (matchedInstance != null) { + return constructLlapLogUrl(matchedInstance); + } + } + return null; + } + + private String constructLlapLogUrl(ServiceInstance serviceInstance) { + return serviceInstance.getServicesAddress() + "/logs"; + } private static class PingingNodeInfo { final AtomicLong logTimestamp;
