Repository: hive Updated Branches: refs/heads/llap bf834079a -> 2e042cc15
HIVE-13138. Add client to communicate with interface, initial split setup. (Siddharth Seth and Vikram Dixit K) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f272acea Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f272acea Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f272acea Branch: refs/heads/llap Commit: f272aceaf7da77f9d87f5be42bb1520181035c2c Parents: bf83407 Author: Siddharth Seth <ss...@apache.org> Authored: Tue Feb 23 23:55:46 2016 -0800 Committer: Siddharth Seth <ss...@apache.org> Committed: Tue Feb 23 23:55:46 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../org/apache/hive/jdbc/LlapInputFormat.java | 10 + .../daemon/rpc/LlapDaemonProtocolProtos.java | 159 ++++++++++--- .../src/protobuf/LlapDaemonProtocol.proto | 8 + .../hive/llap/daemon/impl/LlapDaemon.java | 3 + .../llap/daemon/impl/TaskRunnerCallable.java | 5 +- .../ext/LlapTaskUmbilicalExternalClient.java | 197 ++++++++++++++++ .../helpers/LlapTaskUmbilicalServer.java | 57 +++++ .../hadoop/hive/llap/LlapInputFormat.java | 146 +++--------- .../apache/hadoop/hive/llap/LlapInputSplit.java | 80 ++++--- .../apache/hadoop/hive/llap/SubmitWorkInfo.java | 65 ++++++ .../hive/ql/exec/tez/HiveSplitGenerator.java | 49 +++- .../hive/ql/exec/tez/MapRecordProcessor.java | 2 + .../hive/ql/parse/TypeCheckProcFactory.java | 3 + .../ql/udf/generic/GenericUDFGetSplits.java | 224 +++++++++++++++++-- .../org/apache/tez/dag/api/TaskSpecBuilder.java | 45 ++++ 16 files changed, 837 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7fbcbba..6a22890 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2687,6 +2687,9 @@ public class HiveConf extends Configuration { LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003, "LLAP daemon output service port"), + LLAP_TMP_SUBMITWORK_USING_TEZ_AM("hive.llap.tmp.submit.work.using.tez.am", true,""), + LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS("hive.llap.tmp.ext.client.num.server.handlers", 1, ""), + SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), "Timeout for requests from Hive client to remote Spark driver."), http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java index 97fe2c5..c38dd82 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java +++ b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java @@ -59,6 +59,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma private String pwd; // "" private String query; + public final String URL_KEY = "llap.if.hs2.connection"; + public final String QUERY_KEY = "llap.if.query"; + public final String USER_KEY = "llap.if.user"; + public final String PWD_KEY = "llap.if.pwd"; + private Connection con; private Statement stmt; @@ -133,6 +138,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { List<InputSplit> ins = new ArrayList<InputSplit>(); + if (url == null) url = job.get(URL_KEY); + if (query == null) query = job.get(QUERY_KEY); + if (user == null) user = job.get(USER_KEY); + if (pwd == null) pwd = job.get(PWD_KEY); + if (url == null || query == null) { throw new IllegalStateException(); } http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java ---------------------------------------------------------------------- diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index 4ab7b32..653e7e0 100644 --- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -7334,6 +7334,16 @@ public final class LlapDaemonProtocolProtos { * <code>optional .FragmentRuntimeInfo fragment_runtime_info = 10;</code> */ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfoOrBuilder getFragmentRuntimeInfoOrBuilder(); + + // optional bool usingTezAm = 11 [default = true]; + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + boolean hasUsingTezAm(); + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + boolean getUsingTezAm(); } /** * Protobuf type {@code SubmitWorkRequestProto} @@ -7452,6 +7462,11 @@ public final class LlapDaemonProtocolProtos { bitField0_ |= 0x00000200; break; } + case 88: { + bitField0_ |= 0x00000400; + usingTezAm_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -7799,6 +7814,22 @@ public final class LlapDaemonProtocolProtos { return fragmentRuntimeInfo_; } + // optional bool usingTezAm = 11 [default = true]; + public static final int USINGTEZAM_FIELD_NUMBER = 11; + private boolean usingTezAm_; + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public boolean hasUsingTezAm() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public boolean getUsingTezAm() { + return usingTezAm_; + } + private void initFields() { containerIdString_ = ""; amHost_ = ""; @@ -7810,6 +7841,7 @@ public final class LlapDaemonProtocolProtos { appAttemptNumber_ = 0; fragmentSpec_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto.getDefaultInstance(); fragmentRuntimeInfo_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo.getDefaultInstance(); + usingTezAm_ = true; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -7853,6 +7885,9 @@ public final class LlapDaemonProtocolProtos { if (((bitField0_ & 0x00000200) == 0x00000200)) { output.writeMessage(10, fragmentRuntimeInfo_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + output.writeBool(11, usingTezAm_); + } getUnknownFields().writeTo(output); } @@ -7902,6 +7937,10 @@ public final class LlapDaemonProtocolProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(10, fragmentRuntimeInfo_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(11, usingTezAm_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -7975,6 +8014,11 @@ public final class LlapDaemonProtocolProtos { result = result && getFragmentRuntimeInfo() .equals(other.getFragmentRuntimeInfo()); } + result = result && (hasUsingTezAm() == other.hasUsingTezAm()); + if (hasUsingTezAm()) { + result = result && (getUsingTezAm() + == other.getUsingTezAm()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -8028,6 +8072,10 @@ public final class LlapDaemonProtocolProtos { hash = (37 * hash) + FRAGMENT_RUNTIME_INFO_FIELD_NUMBER; hash = (53 * hash) + getFragmentRuntimeInfo().hashCode(); } + if (hasUsingTezAm()) { + hash = (37 * hash) + USINGTEZAM_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getUsingTezAm()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -8167,6 +8215,8 @@ public final class LlapDaemonProtocolProtos { fragmentRuntimeInfoBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000200); + usingTezAm_ = true; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -8243,6 +8293,10 @@ public final class LlapDaemonProtocolProtos { } else { result.fragmentRuntimeInfo_ = fragmentRuntimeInfoBuilder_.build(); } + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000400; + } + result.usingTezAm_ = usingTezAm_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -8299,6 +8353,9 @@ public final class LlapDaemonProtocolProtos { if (other.hasFragmentRuntimeInfo()) { mergeFragmentRuntimeInfo(other.getFragmentRuntimeInfo()); } + if (other.hasUsingTezAm()) { + setUsingTezAm(other.getUsingTezAm()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -9032,6 +9089,39 @@ public final class LlapDaemonProtocolProtos { return fragmentRuntimeInfoBuilder_; } + // optional bool usingTezAm = 11 [default = true]; + private boolean usingTezAm_ = true; + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public boolean hasUsingTezAm() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public boolean getUsingTezAm() { + return usingTezAm_; + } + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public Builder setUsingTezAm(boolean value) { + bitField0_ |= 0x00000400; + usingTezAm_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public Builder clearUsingTezAm() { + bitField0_ = (bitField0_ & ~0x00000400); + usingTezAm_ = true; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:SubmitWorkRequestProto) } @@ -14392,7 +14482,7 @@ public final class LlapDaemonProtocolProtos { "\030\004 \001(\003\022 \n\030first_attempt_start_time\030\005 \001(\003" + "\022\"\n\032current_attempt_start_time\030\006 \001(\003\"F\n\024" + "QueryIdentifierProto\022\026\n\016app_identifier\030\001" + - " \001(\t\022\026\n\016dag_identifier\030\002 \001(\005\"\266\002\n\026SubmitW" + + " \001(\t\022\026\n\016dag_identifier\030\002 \001(\005\"\320\002\n\026SubmitW" + "orkRequestProto\022\033\n\023container_id_string\030\001" + " \001(\t\022\017\n\007am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030", "\n\020token_identifier\030\004 \001(\t\022\032\n\022credentials_" + @@ -14400,38 +14490,39 @@ public final class LlapDaemonProtocolProtos { "n_id_string\030\007 \001(\t\022\032\n\022app_attempt_number\030" + "\010 \001(\005\022)\n\rfragment_spec\030\t \001(\0132\022.FragmentS" + "pecProto\0223\n\025fragment_runtime_info\030\n \001(\0132" + - "\024.FragmentRuntimeInfo\"J\n\027SubmitWorkRespo" + - "nseProto\022/\n\020submission_state\030\001 \001(\0162\025.Sub" + - "missionStateProto\"\205\001\n\036SourceStateUpdated" + - "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" + - ".QueryIdentifierProto\022\020\n\010src_name\030\002 \001(\t\022", - " \n\005state\030\003 \001(\0162\021.SourceStateProto\"!\n\037Sou" + - "rceStateUpdatedResponseProto\"w\n\031QueryCom" + - "pleteRequestProto\022\020\n\010query_id\030\001 \001(\t\022/\n\020q" + - "uery_identifier\030\002 \001(\0132\025.QueryIdentifierP" + - "roto\022\027\n\014delete_delay\030\004 \001(\003:\0010\"\034\n\032QueryCo" + - "mpleteResponseProto\"t\n\035TerminateFragment" + - "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" + - ".QueryIdentifierProto\022\"\n\032fragment_identi" + - "fier_string\030\002 \001(\t\" \n\036TerminateFragmentRe" + - "sponseProto\"\026\n\024GetTokenRequestProto\"&\n\025G", - "etTokenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020S" + - "ourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RU" + - "NNING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEP" + - "TED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316" + - "\002\n\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Su" + - "bmitWorkRequestProto\032\030.SubmitWorkRespons" + - "eProto\022W\n\022sourceStateUpdated\022\037.SourceSta" + - "teUpdatedRequestProto\032 .SourceStateUpdat" + - "edResponseProto\022H\n\rqueryComplete\022\032.Query" + - "CompleteRequestProto\032\033.QueryCompleteResp", - "onseProto\022T\n\021terminateFragment\022\036.Termina" + - "teFragmentRequestProto\032\037.TerminateFragme" + - "ntResponseProto2]\n\026LlapManagementProtoco" + - "l\022C\n\022getDelegationToken\022\025.GetTokenReques" + - "tProto\032\026.GetTokenResponseProtoBH\n&org.ap" + - "ache.hadoop.hive.llap.daemon.rpcB\030LlapDa" + - "emonProtocolProtos\210\001\001\240\001\001" + "\024.FragmentRuntimeInfo\022\030\n\nusingTezAm\030\013 \001(" + + "\010:\004true\"J\n\027SubmitWorkResponseProto\022/\n\020su" + + "bmission_state\030\001 \001(\0162\025.SubmissionStatePr" + + "oto\"\205\001\n\036SourceStateUpdatedRequestProto\022/" + + "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi", + "erProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(\016" + + "2\021.SourceStateProto\"!\n\037SourceStateUpdate" + + "dResponseProto\"w\n\031QueryCompleteRequestPr" + + "oto\022\020\n\010query_id\030\001 \001(\t\022/\n\020query_identifie" + + "r\030\002 \001(\0132\025.QueryIdentifierProto\022\027\n\014delete" + + "_delay\030\004 \001(\003:\0010\"\034\n\032QueryCompleteResponse" + + "Proto\"t\n\035TerminateFragmentRequestProto\022/" + + "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi" + + "erProto\022\"\n\032fragment_identifier_string\030\002 " + + "\001(\t\" \n\036TerminateFragmentResponseProto\"\026\n", + "\024GetTokenRequestProto\"&\n\025GetTokenRespons" + + "eProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStateProt" + + "o\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Sub" + + "missionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJEC" + + "TED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemonP" + + "rotocol\022?\n\nsubmitWork\022\027.SubmitWorkReques" + + "tProto\032\030.SubmitWorkResponseProto\022W\n\022sour" + + "ceStateUpdated\022\037.SourceStateUpdatedReque" + + "stProto\032 .SourceStateUpdatedResponseProt" + + "o\022H\n\rqueryComplete\022\032.QueryCompleteReques", + "tProto\032\033.QueryCompleteResponseProto\022T\n\021t" + + "erminateFragment\022\036.TerminateFragmentRequ" + + "estProto\032\037.TerminateFragmentResponseProt" + + "o2]\n\026LlapManagementProtocol\022C\n\022getDelega" + + "tionToken\022\025.GetTokenRequestProto\032\026.GetTo" + + "kenResponseProtoBH\n&org.apache.hadoop.hi" + + "ve.llap.daemon.rpcB\030LlapDaemonProtocolPr" + + "otos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -14485,7 +14576,7 @@ public final class LlapDaemonProtocolProtos { internal_static_SubmitWorkRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SubmitWorkRequestProto_descriptor, - new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", }); + new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", "UsingTezAm", }); internal_static_SubmitWorkResponseProto_descriptor = getDescriptor().getMessageTypes().get(8); internal_static_SubmitWorkResponseProto_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-common/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto index 944c96c..e964c5f 100644 --- a/llap-common/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto @@ -91,6 +91,7 @@ message SubmitWorkRequestProto { optional int32 app_attempt_number = 8; optional FragmentSpecProto fragment_spec = 9; optional FragmentRuntimeInfo fragment_runtime_info = 10; + optional bool usingTezAm = 11 [default = true]; } enum SubmissionStateProto { @@ -136,11 +137,18 @@ message GetTokenResponseProto { optional bytes token = 1; } +message SendEventsRequestProto { +} + +message SendEventsResponseProto { +} + service LlapDaemonProtocol { rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto); rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto); rpc queryComplete(QueryCompleteRequestProto) returns (QueryCompleteResponseProto); rpc terminateFragment(TerminateFragmentRequestProto) returns (TerminateFragmentResponseProto); + rpc sendEvents(SendEventsRequestProto) return (SendEventsResponseProto); } service LlapManagementProtocol { http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 8621826..40a89cb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; +import org.apache.hadoop.hive.llap.LlapOutputFormatService; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryPoolMXBean; @@ -279,6 +280,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla LOG.info("Setting shuffle port to: " + ShuffleHandler.get().getPort()); this.shufflePort.set(ShuffleHandler.get().getPort()); super.serviceStart(); + LlapOutputFormatService.get(); LOG.info("LlapDaemon serviceStart complete"); } @@ -286,6 +288,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla super.serviceStop(); ShuffleHandler.shutdown(); shutdown(); + LlapOutputFormatService.get().stop(); LOG.info("LlapDaemon shutdown complete"); } http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index d88d82a..d9d216d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -103,6 +103,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private final String queryId; private final HadoopShim tezHadoopShim; private boolean shouldRunTask = true; + private final boolean withTezAm; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); private final AtomicBoolean isStarted = new AtomicBoolean(false); @@ -131,11 +132,11 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { this.jobToken = TokenCache.getSessionToken(credentials); this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec()); this.amReporter = amReporter; + this.withTezAm = request.getUsingTezAm(); + LOG.warn("ZZZ: DBG: usingTezAm=" + withTezAm); // Register with the AMReporter when the callable is setup. Unregister once it starts running. - if (jobToken != null) { this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier()); - } this.metrics = metrics; this.requestId = request.getFragmentSpec().getFragmentIdentifierString(); // TODO Change this to the queryId/Name when that's available. http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java new file mode 100644 index 0000000..ecc032d --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -0,0 +1,197 @@ +package org.apache.hadoop.hive.llap.ext; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; +import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.AbstractService; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapTaskUmbilicalExternalClient extends AbstractService { + + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class); + + private final LlapProtocolClientProxy communicator; + private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer; + private final Configuration conf; + private final LlapTaskUmbilicalProtocol umbilical; + + protected final String tokenIdentifier; + protected final Token<JobTokenIdentifier> sessionToken; + + + private final ConcurrentMap<String, List<TezEvent>> pendingEvents = new ConcurrentHashMap<>(); + + + // TODO KKK Work out the details of the tokenIdentifier, and the session token. + // It may just be possible to create one here - since Shuffle is not involved, and this is only used + // for communication from LLAP-Daemons to the server. It will need to be sent in as part + // of the job submission request. + public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, Token<JobTokenIdentifier> sessionToken) { + super(LlapTaskUmbilicalExternalClient.class.getName()); + this.conf = conf; + this.umbilical = new LlapTaskUmbilicalExternalImpl(); + this.tokenIdentifier = tokenIdentifier; + this.sessionToken = sessionToken; + // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough. + this.communicator = new LlapProtocolClientProxy(1, conf, null); + } + + @Override + public void serviceStart() throws IOException { + int numHandlers = HiveConf.getIntVar(conf, + HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS); + llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken); + } + + @Override + public void serviceStop() { + llapTaskUmbilicalServer.shutdownServer(); + if (this.communicator != null) { + this.communicator.stop(); + } + } + + + /** + * Submit the work for actual execution. This should always have the usingTezAm flag disabled + * @param submitWorkRequestProto + */ + public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort) { + Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false); + + // Store the actual event first. To be returned on the first heartbeat. + Event mrInputEvent = null; + // Construct a TezEvent out of this, to send it out on the next heaertbeat + +// submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + + + // Send out the actual SubmitWorkRequest + communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort, + new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() { + @Override + public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) { + if (response.hasSubmissionState()) { + if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) { + LOG.info("Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy."); + return; + } + } + LOG.info("DBG: Submitted " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()); + } + + @Override + public void indicateError(Throwable t) { + LOG.error("Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), t); + } + }); + + + + +// // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment. +// // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable +// QueryIdentifierProto queryIdentifier = QueryIdentifierProto +// .newBuilder() +// .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId()) +// .build(); +// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest = +// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState( +// LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED). +// setSrcName(TODO) +// communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set); + + + } + + + + + + + + // TODO Ideally, the server should be shared across all client sessions running on the same node. + private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol { + + @Override + public boolean canCommit(TezTaskAttemptID taskid) throws IOException { + // Expecting only a single instance of a task to be running. + return true; + } + + @Override + public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, + TezException { + // Keep-alive information. The client should be informed and will have to take care of re-submitting the work. + // Some parts of fault tolerance go here. + + // This also provides completion information, and a possible notification when task actually starts running (first heartbeat) + + // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans. + + + TezHeartbeatResponse response = new TezHeartbeatResponse(); + // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this. + TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); + LOG.info("ZZZ: DBG: Received heartbeat from taskAttemptId: " + taskAttemptId.toString()); + + List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString()); + + response.setLastRequestId(request.getRequestId()); + // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task. + // Also since we have all the MRInput events here - they'll all be sent in together. + response.setNextFromEventId(0); // Irrelevant. See comment above. + response.setNextPreRoutedEventId(0); //Irrelevant. See comment above. + response.setEvents(tezEvents); + + // TODO KKK: Should ideally handle things like Task success notifications. + // Can this somehow be hooked into the LlapTaskCommunicator to make testing easy + + return response; + } + + @Override + public void nodeHeartbeat(Text hostname, int port) throws IOException { + // TODO Eventually implement - to handle keep-alive messages from pending work. + } + + @Override + public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { + // TODO Eventually implement - to handle preemptions within LLAP daemons. + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return 0; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, + clientVersion, clientMethodsHash); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java new file mode 100644 index 0000000..dbd591a --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java @@ -0,0 +1,57 @@ +package org.apache.hadoop.hive.llap.tezplugins.helpers; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapTaskUmbilicalServer { + + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalServer.class); + + protected volatile Server server; + private final InetSocketAddress address; + private final AtomicBoolean started = new AtomicBoolean(true); + + public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token<JobTokenIdentifier> token) throws + IOException { + JobTokenSecretManager jobTokenSecretManager = + new JobTokenSecretManager(); + jobTokenSecretManager.addTokenForJob(tokenIdentifier, token); + + server = new RPC.Builder(conf) + .setProtocol(LlapTaskUmbilicalProtocol.class) + .setBindAddress("0.0.0.0") + .setPort(0) + .setInstance(umbilical) + .setNumHandlers(numHandlers) + .setSecretManager(jobTokenSecretManager).build(); + + server.start(); + this.address = NetUtils.getConnectAddress(server); + LOG.info( + "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address + + " with numHandlers=" + numHandlers); + } + + public InetSocketAddress getAddress() { + return this.address; + } + + public void shutdownServer() { + if (started.get()) { // Primarily to avoid multiple shutdowns. + started.set(false); + server.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java index 4db4d32..d308ec8 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java @@ -18,10 +18,17 @@ package org.apache.hadoop.hive.llap; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.Set; import javax.security.auth.login.LoginException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.TezProcessor; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TaskSpecBuilder; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +45,8 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; import java.io.FileNotFoundException; +import java.util.UUID; + import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; @@ -82,15 +91,14 @@ import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; - import com.google.common.base.Preconditions; public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class); - private TezWork work; - private Schema schema; + private final TezWork work; + private final Schema schema; public LlapInputFormat(TezWork tezWork, Schema schema) { this.work = tezWork; @@ -98,22 +106,36 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma } // need empty constructor for bean instantiation - public LlapInputFormat() {} + public LlapInputFormat() { + // None of these fields should be required during getRecordReader, + // and should not be read. + work = null; + schema = null; + } /* * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire * off the work in the split to LLAP and finally return the connected socket back in an * LlapRecordReader. The LlapRecordReader class reads the results from the socket. */ + @Override public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + // Calls a static method to ensure none of the object fields are read. + return _getRecordReader(split, job, reporter); + } + + private static RecordReader _getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws + IOException { LlapInputSplit llapSplit = (LlapInputSplit)split; // TODO: push event into LLAP // this is just the portion that sets up the io to receive data String host = split.getLocations()[0]; - String id = job.get(LlapOutputFormat.LLAP_OF_ID_KEY); + + // TODO: need to construct id here. Format is queryId + "_" + taskIndex + String id = "foobar"; HiveConf conf = new HiveConf(); Socket socket = new Socket(host, @@ -130,120 +152,8 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); } - /* - * getSplits() gets called as part of the GenericUDFGetSplits call to get splits. Here we create - * an array of input splits from the work item we have, figure out the location for llap and pass - * that back for the submission. getRecordReader method above uses that split info to assign the - * work to llap. - */ @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - // TODO: need to build proto of plan - - DAG dag = DAG.create(work.getName()); - dag.setCredentials(job.getCredentials()); - // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag); - - DagUtils utils = DagUtils.getInstance(); - Context ctx = new Context(job); - MapWork mapWork = (MapWork) work.getAllWork().get(0); - // bunch of things get setup in the context based on conf but we need only the MR tmp directory - // for the following method. - JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork); - Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job); - FileSystem fs = scratchDir.getFileSystem(job); - try { - LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job); - Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr, - new ArrayList<LocalResource>(), fs, ctx, false, work, - work.getVertexType(mapWork)); - dag.addVertex(wx); - utils.addCredentials(mapWork, dag); - - // we have the dag now proceed to get the splits: - HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null); - splitGenerator.initializeSplitGenerator(wxConf, mapWork); - List<Event> eventList = splitGenerator.initialize(); - - // hack - just serializing with kryo for now. This needs to be done properly - InputSplit[] result = new InputSplit[eventList.size()]; - int i = 0; - ByteArrayOutputStream bos = new ByteArrayOutputStream(10240); - - InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) - eventList.remove(0); - - List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints(); - for (Event event: eventList) { - TaskLocationHint hint = hints.remove(0); - Set<String> hosts = hint.getHosts(); - SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()]; - - int j = 0; - for (String host: hosts) { - locations[j++] = new SplitLocationInfo(host,false); - } - - bos.reset(); - Kryo kryo = SerializationUtilities.borrowKryo(); - SerializationUtilities.serializeObjectByKryo(kryo, event, bos); - SerializationUtilities.releaseKryo(kryo); - result[i++] = new LlapInputSplit(bos.toByteArray(), locations, schema); - } - return result; - } catch (Exception e) { - throw new IOException(e); - } - } - - /** - * Returns a local resource representing a jar. This resource will be used to execute the plan on - * the cluster. - * - * @param localJarPath - * Local path to the jar to be localized. - * @return LocalResource corresponding to the localized hive exec resource. - * @throws IOException - * when any file system related call fails. - * @throws LoginException - * when we are unable to determine the user. - * @throws URISyntaxException - * when current jar location cannot be determined. - */ - private LocalResource createJarLocalResource(String localJarPath, DagUtils utils, - Configuration conf) - throws IOException, LoginException, IllegalArgumentException, FileNotFoundException { - FileStatus destDirStatus = utils.getHiveJarDirectory(conf); - assert destDirStatus != null; - Path destDirPath = destDirStatus.getPath(); - - Path localFile = new Path(localJarPath); - String sha = getSha(localFile, conf); - - String destFileName = localFile.getName(); - - // Now, try to find the file based on SHA and name. Currently we require exact name match. - // We could also allow cutting off versions and other stuff provided that SHA matches... - destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha - + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName); - - // TODO: if this method is ever called on more than one jar, getting the dir and the - // list need to be refactored out to be done only once. - Path destFile = new Path(destDirPath.toString() + "/" + destFileName); - return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf); - } - - private String getSha(Path localFile, Configuration conf) - throws IOException, IllegalArgumentException { - InputStream is = null; - try { - FileSystem localFs = FileSystem.getLocal(conf); - is = localFs.open(localFile); - return DigestUtils.sha256Hex(is); - } finally { - if (is != null) { - is.close(); - } - } + throw new IOException("These are not the splits you are looking for."); } } http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java index 78dbb34..4249a16 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java @@ -16,49 +16,49 @@ */ package org.apache.hadoop.hive.llap; -import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; -import java.io.DataInputStream; -import java.io.ByteArrayInputStream; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.hive.llap.io.api.LlapProxy; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hadoop.mapred.InputSplitWithLocationInfo; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; +import java.io.IOException; + import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.mapred.InputSplitWithLocationInfo; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.AutoExpandingBufferWriteTransport; import org.apache.thrift.transport.AutoExpandingBuffer; - -import com.google.common.base.Preconditions; +import org.apache.thrift.transport.AutoExpandingBufferWriteTransport; public class LlapInputSplit implements InputSplitWithLocationInfo { - byte[] queryFragment; + byte[] planBytes; + byte[] fragmentBytes; SplitLocationInfo[] locations; Schema schema; - public LlapInputSplit() {} - public LlapInputSplit(byte[] queryFragment, SplitLocationInfo[] locations, Schema schema) { - this.queryFragment = queryFragment; + // // Static + // ContainerIdString + // DagName + // VertexName + // FragmentNumber + // AttemptNumber - always 0 + // FragmentIdentifierString - taskAttemptId + + // ProcessorDescsriptor + // InputSpec + // OutputSpec + + // Tokens + + // // Dynamic + // + + public LlapInputSplit() { + } + + public LlapInputSplit(byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema) { + this.planBytes = planBytes; + this.fragmentBytes = fragmentBytes; this.locations = locations; this.schema = schema; } @@ -83,8 +83,11 @@ public class LlapInputSplit implements InputSplitWithLocationInfo { @Override public void write(DataOutput out) throws IOException { - out.writeInt(queryFragment.length); - out.write(queryFragment); + out.writeInt(planBytes.length); + out.write(planBytes); + + out.writeInt(fragmentBytes.length); + out.write(fragmentBytes); out.writeInt(locations.length); for (int i = 0; i < locations.length; ++i) { @@ -108,11 +111,13 @@ public class LlapInputSplit implements InputSplitWithLocationInfo { @Override public void readFields(DataInput in) throws IOException { - byte[] queryFragment; - int length = in.readInt(); - queryFragment = new byte[length]; - in.readFully(queryFragment); + planBytes = new byte[length]; + in.readFully(planBytes); + + length = in.readInt(); + fragmentBytes = new byte[length]; + in.readFully(fragmentBytes); length = in.readInt(); locations = new SplitLocationInfo[length]; @@ -124,7 +129,8 @@ public class LlapInputSplit implements InputSplitWithLocationInfo { length = in.readInt(); try { - AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(length, 2d); + AutoExpandingBufferWriteTransport transport = + new AutoExpandingBufferWriteTransport(length, 2d); AutoExpandingBuffer buf = transport.getBuf(); in.readFully(buf.array(), 0, length); http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java new file mode 100644 index 0000000..a9a3738 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.hive.llap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.runtime.api.impl.TaskSpec; + +public class SubmitWorkInfo implements Writable { + + private TaskSpec taskSpec; + private ApplicationId fakeAppId; + + public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId) { + this.taskSpec = taskSpec; + this.fakeAppId = fakeAppId; + } + + // Empty constructor for writable etc. + public SubmitWorkInfo() { + } + + public TaskSpec getTaskSpec() { + return taskSpec; + } + + public ApplicationId getFakeAppId() { + return fakeAppId; + } + + @Override + public void write(DataOutput out) throws IOException { + taskSpec.write(out); + out.writeLong(fakeAppId.getClusterTimestamp()); + out.writeInt(fakeAppId.getId()); + } + + @Override + public void readFields(DataInput in) throws IOException { + taskSpec = new TaskSpec(); + taskSpec.readFields(in); + long appIdTs = in.readLong(); + int appIdId = in.readInt(); + fakeAppId = ApplicationId.newInstance(appIdTs, appIdId); + } + + public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + submitWorkInfo.write(dob); + return dob.getData(); + } + + public SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException { + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length); + SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(); + submitWorkInfo.readFields(dib); + return submitWorkInfo; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index b0cda82..011e459 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -33,6 +33,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.SerDeException; @@ -44,6 +47,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.split.SplitLocationProvider; import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.VertexLocationHint; @@ -82,10 +86,30 @@ public class HiveSplitGenerator extends InputInitializer { private final SplitGrouper splitGrouper = new SplitGrouper(); private SplitLocationProvider splitLocationProvider = null; - public void initializeSplitGenerator(Configuration conf, MapWork work) { + + // TODO RSHACK This entire method needs to be reworked. Put back final fields, separate into reusable components etc. + public void initializeSplitGenerator(Configuration conf, MapWork work) throws IOException { + this.conf = conf; this.work = work; - this.jobConf = new JobConf(conf); + + // TODO RSHACK - assuming grouping enabled always. + userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build(); + + this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); + LOG.info("SplitLocationProvider: " + splitLocationProvider); + + // Read all credentials into the credentials instance stored in JobConf. + ShimLoader.getHadoopShims().getMergedCredentials(jobConf); + + this.work = Utilities.getMapWork(jobConf); + + // Events can start coming in the moment the InputInitializer is created. The pruner + // must be setup and initialized here so that it sets up it's structures to start accepting events. + // Setting it up in initialize leads to a window where events may come in before the pruner is + // initialized, which may cause it to drop events. + // TODO RSHACK - No dynamic partition pruning + pruner = null; } public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException, @@ -129,7 +153,9 @@ public class HiveSplitGenerator extends InputInitializer { conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true); // perform dynamic partition pruning - pruner.prune(); + if (pruner != null) { + pruner.prune(); + } InputSplitInfoMem inputSplitInfo = null; boolean generateConsistentSplits = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS); @@ -142,9 +168,20 @@ public class HiveSplitGenerator extends InputInitializer { (InputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(realInputFormatName), jobConf); - int totalResource = getContext().getTotalAvailableResource().getMemory(); - int taskResource = getContext().getVertexTaskResource().getMemory(); - int availableSlots = totalResource / taskResource; + int totalResource = 0; + int taskResource = 0; + int availableSlots = 0; + // FIXME. Do the right thing Luke. + if (getContext() == null) { + // for now, totalResource = taskResource for llap + availableSlots = 1; + } + + if (getContext() != null) { + totalResource = getContext().getTotalAvailableResource().getMemory(); + taskResource = getContext().getVertexTaskResource().getMemory(); + availableSlots = totalResource / taskResource; + } if (HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, 1) <= 1) { // broken configuration from mapred-default.xml http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 0584ad8..3fe70ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.llap.LlapOutputFormat; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; @@ -94,6 +95,7 @@ public class MapRecordProcessor extends RecordProcessor { super(jconf, context); String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); if (LlapProxy.isDaemon()) { // do not cache plan + jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, queryId + "_" + context.getTaskIndex()); cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); } else { cache = ObjectCacheFactory.getCache(jconf, queryId); http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java index 598520c..0997233 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java @@ -1315,12 +1315,15 @@ public class TypeCheckProcFactory { try { return getXpathOrFuncExprNodeDesc(expr, isFunction, children, ctx); } catch (UDFArgumentTypeException e) { + LOG.error("UDFArgumentTypeException: ", e); throw new SemanticException(ErrorMsg.INVALID_ARGUMENT_TYPE.getMsg(expr .getChild(childrenBegin + e.getArgumentId()), e.getMessage())); } catch (UDFArgumentLengthException e) { + LOG.error("UDFArgumentLengthException: ", e); throw new SemanticException(ErrorMsg.INVALID_ARGUMENT_LENGTH.getMsg( expr, e.getMessage())); } catch (UDFArgumentException e) { + LOG.error("UDFArgumentException: ", e); throw new SemanticException(ErrorMsg.INVALID_ARGUMENT.getMsg(expr, e .getMessage())); } http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java index 3b7dcd9..9c7e1f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java @@ -18,6 +18,20 @@ package org.apache.hadoop.hive.ql.udf.generic; +import org.apache.hadoop.hive.llap.LlapInputSplit; +import org.apache.hadoop.hive.llap.SubmitWorkInfo; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; + +import javax.security.auth.login.LoginException; + import java.util.Arrays; import java.util.List; import java.util.ArrayList; @@ -28,6 +42,17 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.DataOutput; +import com.esotericsoftware.kryo.Kryo; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.io.InputStream; + +import org.apache.hadoop.hive.ql.exec.tez.TezProcessor; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TaskSpecBuilder; +import org.apache.tez.runtime.api.impl.TaskSpec; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +94,55 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.metastore.api.Schema; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.io.FileNotFoundException; +import java.util.UUID; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.runtime.api.Event; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.VertexLocationHint; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; +import org.apache.tez.mapreduce.hadoop.MRInputHelpers; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.InputSpecUpdate; +import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.events.InputInitializerEvent; /** * GenericUDFGetSplits. @@ -177,7 +251,6 @@ public class GenericUDFGetSplits extends GenericUDF { } Path data = null; - InputFormat inp = null; String ifc = null; TezWork tezWork = ((TezTask)roots.get(0)).getWork(); @@ -214,33 +287,13 @@ public class GenericUDFGetSplits extends GenericUDF { } tezWork = ((TezTask)roots.get(0)).getWork(); - - // Table table = db.getTable(tableName); - // if (table.isPartitioned()) { - // throw new UDFArgumentException("Table " + tableName + " is partitioned."); - // } - // data = table.getDataLocation(); - // LOG.info("looking at: "+data); - - // ifc = table.getInputFormatClass().toString(); - - // inp = ReflectionUtils.newInstance(table.getInputFormatClass(), jc); } MapWork w = (MapWork)tezWork.getAllWork().get(0); - inp = new LlapInputFormat(tezWork, schema); ifc = LlapInputFormat.class.toString(); try { - if (inp instanceof JobConfigurable) { - ((JobConfigurable) inp).configure(jc); - } - - if (inp instanceof FileInputFormat) { - ((FileInputFormat) inp).addInputPath(jc, data); - } - - for (InputSplit s: inp.getSplits(jc, num)) { + for (InputSplit s: getSplits(jc, num, tezWork, schema)) { Object[] os = new Object[3]; os[0] = ifc; os[1] = s.getClass().toString(); @@ -257,6 +310,133 @@ public class GenericUDFGetSplits extends GenericUDF { return retArray; } + public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) throws IOException { + DAG dag = DAG.create(work.getName()); + dag.setCredentials(job.getCredentials()); + // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag); + + DagUtils utils = DagUtils.getInstance(); + Context ctx = new Context(job); + MapWork mapWork = (MapWork) work.getAllWork().get(0); + // bunch of things get setup in the context based on conf but we need only the MR tmp directory + // for the following method. + JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork); + Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job); + FileSystem fs = scratchDir.getFileSystem(job); + try { + LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job); + Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr, + new ArrayList<LocalResource>(), fs, ctx, false, work, + work.getVertexType(mapWork)); + String vertexName = wx.getName(); + dag.addVertex(wx); + utils.addCredentials(mapWork, dag); + + // we have the dag now proceed to get the splits: + HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null); + Preconditions.checkState(HiveConf.getBoolVar(wxConf, + HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS)); + Preconditions.checkState(HiveConf.getBoolVar(wxConf, + HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS)); + splitGenerator.initializeSplitGenerator(wxConf, mapWork); + List<Event> eventList = splitGenerator.initialize(); + + // hack - just serializing with kryo for now. This needs to be done properly + InputSplit[] result = new InputSplit[eventList.size()]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(10240); + + InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0); + + List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints(); + + Preconditions.checkState(hints.size() == eventList.size() -1); + + LOG.info("DBG: Number of splits: " + (eventList.size() - 1)); + for (int i = 1 ; i < eventList.size() ; i++) { + // Creating the TezEvent here itself, since it's easy to serialize. + Event event = eventList.get(i); + TaskLocationHint hint = hints.get(i-1); + Set<String> hosts = hint.getHosts(); + LOG.info("DBG: Using locations: " + hosts.toString()); + if (hosts.size() != 1) { + LOG.warn("DBG: Bad # of locations: " + hosts.size()); + } + SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()]; + + int j = 0; + for (String host : hosts) { + locations[j++] = new SplitLocationInfo(host, false); + } + + bos.reset(); + Kryo kryo = SerializationUtilities.borrowKryo(); + SerializationUtilities.serializeObjectByKryo(kryo, event, bos); + SerializationUtilities.releaseKryo(kryo); + + TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1); + ApplicationId fakeApplicationId = ApplicationId.newInstance(new Random().nextInt(), 0); + SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(taskSpec, fakeApplicationId); + byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo); + + result[i-1] = new LlapInputSplit(submitWorkBytes, bos.toByteArray(), locations, schema); + } + return result; + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * Returns a local resource representing a jar. This resource will be used to execute the plan on + * the cluster. + * + * @param localJarPath + * Local path to the jar to be localized. + * @return LocalResource corresponding to the localized hive exec resource. + * @throws IOException + * when any file system related call fails. + * @throws LoginException + * when we are unable to determine the user. + * @throws URISyntaxException + * when current jar location cannot be determined. + */ + private LocalResource createJarLocalResource(String localJarPath, DagUtils utils, + Configuration conf) + throws IOException, LoginException, IllegalArgumentException, FileNotFoundException { + FileStatus destDirStatus = utils.getHiveJarDirectory(conf); + assert destDirStatus != null; + Path destDirPath = destDirStatus.getPath(); + + Path localFile = new Path(localJarPath); + String sha = getSha(localFile, conf); + + String destFileName = localFile.getName(); + + // Now, try to find the file based on SHA and name. Currently we require exact name match. + // We could also allow cutting off versions and other stuff provided that SHA matches... + destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha + + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName); + + // TODO: if this method is ever called on more than one jar, getting the dir and the + // list need to be refactored out to be done only once. + Path destFile = new Path(destDirPath.toString() + "/" + destFileName); + return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf); + } + + private String getSha(Path localFile, Configuration conf) + throws IOException, IllegalArgumentException { + InputStream is = null; + try { + FileSystem localFs = FileSystem.getLocal(conf); + is = localFs.open(localFile); + return DigestUtils.sha256Hex(is); + } finally { + if (is != null) { + is.close(); + } + } + } + @Override public String getDisplayString(String[] children) { assert children.length == 2; http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java new file mode 100644 index 0000000..d0c7c5a --- /dev/null +++ b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java @@ -0,0 +1,45 @@ +package org.apache.tez.dag.api; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.base.Preconditions; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; + +// Proxy class within the tez.api package to access package private methods. +public class TaskSpecBuilder { + + public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits) { + Vertex vertex = dag.getVertex(vertexName); + ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor(); + List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs = + vertex.getInputs(); + List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> outputs = + vertex.getOutputs(); + + // TODO RSHACK - for now these must be of size 1. + Preconditions.checkState(inputs.size() == 1); + Preconditions.checkState(outputs.size() == 1); + + List<InputSpec> inputSpecs = new ArrayList<>(); + for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) { + InputSpec inputSpec = new InputSpec(input.getName(), input.getIODescriptor(), 1); + inputSpecs.add(inputSpec); + } + + List<OutputSpec> outputSpecs = new ArrayList<>(); + for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output : outputs) { + OutputSpec outputSpec = new OutputSpec(output.getName(), output.getIODescriptor(), 1); + outputSpecs.add(outputSpec); + } + + TaskSpec taskSpec = TaskSpec + .createBaseTaskSpec(dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs, + outputSpecs, null); + + return taskSpec; + } + +}