Repository: hive Updated Branches: refs/heads/master df51738ad -> 2de64b0b0
HIVE-14078: LLAP input split should get task attempt number from conf if available (Jason Dere, reviewed by Prasanth Jayachandran/Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2de64b0b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2de64b0b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2de64b0b Branch: refs/heads/master Commit: 2de64b0b0575264aa9716bc6fa824cd076884257 Parents: df51738 Author: Jason Dere <jd...@hortonworks.com> Authored: Thu Jun 30 13:14:03 2016 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Thu Jun 30 13:14:03 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/LlapBaseInputFormat.java | 22 +++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2de64b0b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index aef5762..c2fca54 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -59,6 +59,8 @@ import org.apache.hadoop.mapred.InputSplitWithLocationInfo; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -137,15 +139,25 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder = new LlapRecordReaderTaskUmbilicalExternalResponder(); - // TODO: close this LlapTaskUmbilicalExternalClient llapClient = new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), submitWorkInfo.getToken(), umbilicalResponder, llapToken); llapClient.init(job); llapClient.start(); + int attemptNum = 0; + // Use task attempt number from conf if provided + TaskAttemptID taskAttemptId = TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID)); + if (taskAttemptId != null) { + attemptNum = taskAttemptId.getId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting attempt number to " + attemptNum + " from task attempt ID in conf: " + + job.get(MRJobConfig.TASK_ATTEMPT_ID)); + } + } + SubmitWorkRequestProto request = constructSubmitWorkRequestProto( - submitWorkInfo, llapSplit.getSplitNum(), llapClient.getAddress(), + submitWorkInfo, llapSplit.getSplitNum(), attemptNum, llapClient.getAddress(), submitWorkInfo.getToken(), llapSplit.getFragmentBytes(), llapSplit.getFragmentBytesSignature()); llapClient.submitWork(request, host, llapSubmitPort); @@ -275,7 +287,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> } private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, - int taskNum, InetSocketAddress address, Token<JobTokenIdentifier> token, + int taskNum, int attemptNum, InetSocketAddress address, Token<JobTokenIdentifier> token, byte[] fragmentBytes, byte[] fragmentBytesSignature) throws IOException { ApplicationId appId = submitWorkInfo.getFakeAppId(); @@ -284,7 +296,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> LOG.info("Setting user in submitWorkRequest to: " + user); ContainerId containerId = - ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); + ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, attemptNum), taskNum); // Credentials can change across DAGs. Ideally construct only once per DAG. Credentials credentials = new Credentials(); @@ -309,7 +321,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> } builder.setWorkSpec(vertexBuilder.build()); builder.setFragmentNumber(taskNum); - builder.setAttemptNumber(0); // TODO: hmm + builder.setAttemptNumber(attemptNum); builder.setContainerIdString(containerId.toString()); builder.setAmHost(address.getHostName()); builder.setAmPort(address.getPort());