HIVE-13140. Wire the client to submit execution fragments. (Gunther Hagleitner, Siddharth Seth and Vikram Dixit K)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2e042cc1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2e042cc1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2e042cc1 Branch: refs/heads/llap Commit: 2e042cc159c0e7e044297fc4d6b177a8841eb7fd Parents: f272ace Author: Siddharth Seth <ss...@apache.org> Authored: Tue Feb 23 23:56:38 2016 -0800 Committer: Siddharth Seth <ss...@apache.org> Committed: Tue Feb 23 23:56:38 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hive/jdbc/LlapInputFormat.java | 7 +- .../hadoop/hive/llap/LlapInputFormat.java | 198 ++++++++++++ .../ext/LlapTaskUmbilicalExternalClient.java | 23 +- .../apache/hadoop/hive/llap/LlapInputSplit.java | 77 ++--- .../apache/hadoop/hive/llap/SubmitWorkInfo.java | 42 ++- .../ql/udf/generic/GenericUDFGetSplits.java | 317 +++++++++---------- .../org/apache/tez/dag/api/TaskSpecBuilder.java | 27 +- 7 files changed, 467 insertions(+), 224 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java index c38dd82..e662414 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java +++ b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java @@ -130,8 +130,9 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma @Override public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - LlapInputSplit llapSplit = (LlapInputSplit)split; - return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter); + try { + return ((InputFormat)Class.forName("org.apache.hadoop.hive.llap.LlapInputFormat").newInstance()).getRecordReader(split, job, reporter); + } catch (Exception e) { throw new IOException(e); } } @Override @@ -160,7 +161,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma ResultSet res = stmt.executeQuery(sql); while (res.next()) { // deserialize split - DataInput in = new DataInputStream(new ByteArrayInputStream(res.getBytes(3))); + DataInput in = new DataInputStream(res.getBinaryStream(3)); InputSplit is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); // todo setAccessible on ctor is.readFields(in); ins.add(is); http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java new file mode 100644 index 0000000..cf13c1e --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.List; + +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; +import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.TokenCache; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { + + private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class); + + + public LlapInputFormat() { + } + + /* + * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire + * off the work in the split to LLAP and finally return the connected socket back in an + * LlapRecordReader. The LlapRecordReader class reads the results from the socket. + */ + @Override + public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + + LlapInputSplit llapSplit = (LlapInputSplit) split; + SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes()); + + int llapSubmitPort = HiveConf.getIntVar(job, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT); + + LOG.info("ZZZ: DBG: Starting LlapTaskUmbilicalExternalClient"); + + LlapTaskUmbilicalExternalClient llapClient = + new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), + submitWorkInfo.getToken()); + llapClient.init(job); + llapClient.start(); + + LOG.info("ZZZ: DBG: Crated LlapClient"); + // TODO KKK Shutdown the llap client. + + SubmitWorkRequestProto submitWorkRequestProto = + constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(), + llapClient.getAddress(), submitWorkInfo.getToken()); + + LOG.info("ZZZ: DBG: Created submitWorkRequest for: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()); + + TezEvent tezEvent = new TezEvent(); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length); + tezEvent.readFields(dib); + List<TezEvent> tezEventList = Lists.newArrayList(); + tezEventList.add(tezEvent); + + // this is just the portion that sets up the io to receive data + String host = split.getLocations()[0]; + + llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList); + + String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum(); + + HiveConf conf = new HiveConf(); + Socket socket = new Socket(host, + conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)); + + LOG.debug("Socket connected"); + + socket.getOutputStream().write(id.getBytes()); + socket.getOutputStream().write(0); + socket.getOutputStream().flush(); + + LOG.debug("Registered id: " + id); + + return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + throw new IOException("These are not the splits you are looking for."); + } + + private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, + int taskNum, + InetSocketAddress address, + Token<JobTokenIdentifier> token) throws + IOException { + TaskSpec taskSpec = submitWorkInfo.getTaskSpec(); + ApplicationId appId = submitWorkInfo.getFakeAppId(); + + SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder(); + // This works, assuming the executor is running within YARN. + LOG.info("DBG: Setting user in submitWorkRequest to: " + + System.getenv(ApplicationConstants.Environment.USER.name())); + builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); + builder.setApplicationIdString(appId.toString()); + builder.setAppAttemptNumber(0); + builder.setTokenIdentifier(appId.toString()); + + ContainerId containerId = + ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); + builder.setContainerIdString(containerId.toString()); + + + builder.setAmHost(address.getHostName()); + builder.setAmPort(address.getPort()); + Credentials taskCredentials = new Credentials(); + // Credentials can change across DAGs. Ideally construct only once per DAG. + // TODO Figure out where credentials will come from. Normally Hive sets up + // URLs on the tez dag, for which Tez acquires credentials. + +// taskCredentials.addAll(getContext().getCredentials()); + +// Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == +// taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); +// ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); +// if (credentialsBinary == null) { +// credentialsBinary = serializeCredentials(getContext().getCredentials()); +// credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); +// } else { +// credentialsBinary = credentialsBinary.duplicate(); +// } +// builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + Credentials credentials = new Credentials(); + TokenCache.setSessionToken(token, credentials); + ByteBuffer credentialsBinary = serializeCredentials(credentials); + builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + + + builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec)); + + FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder(); + runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis()); + runtimeInfo.setWithinDagPriority(0); + runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime()); + runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime()); + runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism()); + runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0); + + + builder.setUsingTezAm(false); + builder.setFragmentRuntimeInfo(runtimeInfo.build()); + return builder.build(); + } + + private ByteBuffer serializeCredentials(Credentials credentials) throws IOException { + Credentials containerCredentials = new Credentials(); + containerCredentials.addAll(credentials); + DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); + containerCredentials.writeTokenStorageToStream(containerTokens_dob); + return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index ecc032d..4305682 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -1,6 +1,8 @@ package org.apache.hadoop.hive.llap.ext; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -55,6 +57,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { this.sessionToken = sessionToken; // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough. this.communicator = new LlapProtocolClientProxy(1, conf, null); + this.communicator.init(conf); } @Override @@ -62,6 +65,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { int numHandlers = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS); llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken); + communicator.start(); } @Override @@ -72,24 +76,31 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { } } + public InetSocketAddress getAddress() { + return llapTaskUmbilicalServer.getAddress(); + } + /** * Submit the work for actual execution. This should always have the usingTezAm flag disabled * @param submitWorkRequestProto */ - public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort) { + public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) { Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false); - // Store the actual event first. To be returned on the first heartbeat. - Event mrInputEvent = null; - // Construct a TezEvent out of this, to send it out on the next heaertbeat + LOG.warn("ZZZ: DBG: " + " Submitting fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " on host: " + llapHost + ", port=" + llapPort); +// LOG.info("ZZZ: DBG: " + " Complete SubmitWorkRequest: " + submitWorkRequestProto); // submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + LOG.info("ZZZ: DBG: Received {} events for {}", tezEvents.size(), submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()); + // Register the pending events to be sent for this spec. + pendingEvents.putIfAbsent(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), tezEvents); // Send out the actual SubmitWorkRequest communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort, new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() { + @Override public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) { if (response.hasSubmissionState()) { @@ -110,6 +121,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { + // // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment. // // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable // QueryIdentifierProto queryIdentifier = QueryIdentifierProto @@ -157,6 +169,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { LOG.info("ZZZ: DBG: Received heartbeat from taskAttemptId: " + taskAttemptId.toString()); List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString()); + if (tezEvents == null) { + tezEvents = Collections.emptyList(); + } response.setLastRequestId(request.getRequestId()); // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task. http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java index 4249a16..d26a579 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java @@ -23,44 +23,26 @@ import java.io.IOException; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.mapred.InputSplitWithLocationInfo; import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.AutoExpandingBuffer; -import org.apache.thrift.transport.AutoExpandingBufferWriteTransport; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TSerializer; public class LlapInputSplit implements InputSplitWithLocationInfo { + int splitNum; byte[] planBytes; byte[] fragmentBytes; SplitLocationInfo[] locations; Schema schema; - - // // Static - // ContainerIdString - // DagName - // VertexName - // FragmentNumber - // AttemptNumber - always 0 - // FragmentIdentifierString - taskAttemptId - - // ProcessorDescsriptor - // InputSpec - // OutputSpec - - // Tokens - - // // Dynamic - // - public LlapInputSplit() { } - public LlapInputSplit(byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema) { + public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema) { this.planBytes = planBytes; this.fragmentBytes = fragmentBytes; this.locations = locations; this.schema = schema; + this.splitNum = splitNum; } public Schema getSchema() { @@ -81,8 +63,23 @@ public class LlapInputSplit implements InputSplitWithLocationInfo { return locs; } + public int getSplitNum() { + return splitNum; + } + + public byte[] getPlanBytes() { + return planBytes; + } + + public byte[] getFragmentBytes() { + return fragmentBytes; + } + + + @Override public void write(DataOutput out) throws IOException { + out.writeInt(splitNum); out.writeInt(planBytes.length); out.write(planBytes); @@ -97,20 +94,24 @@ public class LlapInputSplit implements InputSplitWithLocationInfo { byte[] binarySchema; try { - AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(1024, 2d); - TProtocol protocol = new TBinaryProtocol(transport); - schema.write(protocol); - binarySchema = transport.getBuf().array(); + TSerializer serializer = new TSerializer(); + byte[] serialzied = serializer.serialize(schema); + out.writeInt(serialzied.length); + out.write(serialzied); +// AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(1024, 2d); +// TProtocol protocol = new TBinaryProtocol(transport); +// schema.write(protocol); +// binarySchema = transport.getBuf().array(); } catch (Exception e) { throw new IOException(e); } - out.writeInt(binarySchema.length); - out.write(binarySchema); + } @Override public void readFields(DataInput in) throws IOException { + splitNum = in.readInt(); int length = in.readInt(); planBytes = new byte[length]; in.readFully(planBytes); @@ -129,14 +130,18 @@ public class LlapInputSplit implements InputSplitWithLocationInfo { length = in.readInt(); try { - AutoExpandingBufferWriteTransport transport = - new AutoExpandingBufferWriteTransport(length, 2d); - AutoExpandingBuffer buf = transport.getBuf(); - in.readFully(buf.array(), 0, length); - - TProtocol protocol = new TBinaryProtocol(transport); + byte[] schemaBytes = new byte[length]; + in.readFully(schemaBytes); + TDeserializer tDeserializer = new TDeserializer(); schema = new Schema(); - schema.read(protocol); + tDeserializer.deserialize(schema, schemaBytes); +// AutoExpandingBufferReadTransport transport = new AutoExpandingBufferReadTransport(length, 2d); +// AutoExpandingBuffer buf = transport.getBuf(); +// in.readFully(buf.array(), 0, length); +// +// TProtocol protocol = new TBinaryProtocol(transport); +// schema = new Schema(); +// schema.read(protocol); } catch (Exception e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java index a9a3738..83149ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java @@ -6,18 +6,29 @@ import java.io.IOException; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.runtime.api.impl.TaskSpec; public class SubmitWorkInfo implements Writable { private TaskSpec taskSpec; private ApplicationId fakeAppId; + private long creationTime; - public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId) { + // This is used to communicate over the LlapUmbilicalProtocol. Not related to tokens used to + // talk to LLAP daemons itself via the securit work. + private Token<JobTokenIdentifier> token; + + public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId, long creationTime) { this.taskSpec = taskSpec; this.fakeAppId = fakeAppId; + this.token = createJobToken(); + this.creationTime = creationTime; } // Empty constructor for writable etc. @@ -32,11 +43,25 @@ public class SubmitWorkInfo implements Writable { return fakeAppId; } + public String getTokenIdentifier() { + return fakeAppId.toString(); + } + + public Token<JobTokenIdentifier> getToken() { + return token; + } + + public long getCreationTime() { + return creationTime; + } + @Override public void write(DataOutput out) throws IOException { taskSpec.write(out); out.writeLong(fakeAppId.getClusterTimestamp()); out.writeInt(fakeAppId.getId()); + token.write(out); + out.writeLong(creationTime); } @Override @@ -46,6 +71,9 @@ public class SubmitWorkInfo implements Writable { long appIdTs = in.readLong(); int appIdId = in.readInt(); fakeAppId = ApplicationId.newInstance(appIdTs, appIdId); + token = new Token<>(); + token.readFields(in); + creationTime = in.readLong(); } public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException { @@ -54,7 +82,7 @@ public class SubmitWorkInfo implements Writable { return dob.getData(); } - public SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException { + public static SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException { DataInputBuffer dib = new DataInputBuffer(); dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length); SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(); @@ -62,4 +90,14 @@ public class SubmitWorkInfo implements Writable { return submitWorkInfo; } + + private Token<JobTokenIdentifier> createJobToken() { + String tokenIdentifier = fakeAppId.toString(); + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( + tokenIdentifier)); + Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier, + new JobTokenSecretManager()); + sessionToken.setService(identifier.getJobId()); + return sessionToken; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java index 9c7e1f2..9fa4aa8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java @@ -18,131 +18,79 @@ package org.apache.hadoop.hive.ql.udf.generic; -import org.apache.hadoop.hive.llap.LlapInputSplit; -import org.apache.hadoop.hive.llap.SubmitWorkInfo; - +import javax.security.auth.login.LoginException; +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.Set; - -import com.google.common.base.Preconditions; - -import org.apache.hadoop.hive.ql.exec.SerializationUtilities; - -import javax.security.auth.login.LoginException; - -import java.util.Arrays; -import java.util.List; -import java.util.ArrayList; import java.util.UUID; -import java.io.Serializable; -import java.io.IOException; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.DataOutput; import com.esotericsoftware.kryo.Kryo; -import java.io.ByteArrayOutputStream; -import java.io.OutputStream; -import java.io.InputStream; - -import org.apache.hadoop.hive.ql.exec.tez.TezProcessor; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TaskSpecBuilder; -import org.apache.tez.runtime.api.impl.TaskSpec; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.hive.ql.udf.UDFType; +import com.google.common.base.Preconditions; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.llap.LlapInputSplit; +import org.apache.hadoop.hive.llap.LlapOutputFormat; +import org.apache.hadoop.hive.llap.SubmitWorkInfo; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.tez.TezTask; -import org.apache.hadoop.hive.ql.plan.TezWork; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.MapredContext; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.llap.LlapInputFormat; -import org.apache.hadoop.hive.llap.LlapOutputFormat; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.metastore.api.Schema; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URISyntaxException; -import java.io.FileNotFoundException; -import java.util.UUID; - -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.io.FilenameUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.Vertex; -import org.apache.tez.runtime.api.Event; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.exec.tez.DagUtils; -import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.TezWork; -import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.tez.dag.api.TaskLocationHint; -import org.apache.tez.dag.api.VertexLocationHint; -import org.apache.tez.dag.api.event.VertexStateUpdate; -import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; -import org.apache.tez.mapreduce.hadoop.MRInputHelpers; -import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; -import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; -import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; +import org.apache.tez.dag.api.TaskSpecBuilder; +import org.apache.tez.dag.api.Vertex; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.InputInitializer; -import org.apache.tez.runtime.api.InputInitializerContext; -import org.apache.tez.runtime.api.InputSpecUpdate; import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; -import org.apache.tez.runtime.api.events.InputDataInformationEvent; -import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * GenericUDFGetSplits. @@ -155,6 +103,8 @@ public class GenericUDFGetSplits extends GenericUDF { private static final Logger LOG = LoggerFactory.getLogger(GenericUDFGetSplits.class); + private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat"; + private transient StringObjectInspector stringOI; private transient IntObjectInspector intOI; private final ArrayList<Object> retArray = new ArrayList<Object>(); @@ -190,13 +140,13 @@ public class GenericUDFGetSplits extends GenericUDF { } else if (!(arguments[0] instanceof StringObjectInspector)) { LOG.error("Got "+arguments[0].getTypeName()+" instead of string."); throw new UDFArgumentTypeException(0, "\"" - + "string\" is expected at function GET_SPLITS, " + "but \"" - + arguments[0].getTypeName() + "\" is found"); + + "string\" is expected at function GET_SPLITS, " + "but \"" + + arguments[0].getTypeName() + "\" is found"); } else if (!(arguments[1] instanceof IntObjectInspector)) { LOG.error("Got "+arguments[1].getTypeName()+" instead of int."); throw new UDFArgumentTypeException(1, "\"" - + "int\" is expected at function GET_SPLITS, " + "but \"" - + arguments[1].getTypeName() + "\" is found"); + + "int\" is expected at function GET_SPLITS, " + "but \"" + + arguments[1].getTypeName() + "\" is found"); } stringOI = (StringObjectInspector) arguments[0]; @@ -204,9 +154,9 @@ public class GenericUDFGetSplits extends GenericUDF { List<String> names = Arrays.asList("if_class","split_class","split"); List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList( - PrimitiveObjectInspectorFactory.javaStringObjectInspector, - PrimitiveObjectInspectorFactory.javaStringObjectInspector, - PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); ObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs); ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(outputOI); bos = new ByteArrayOutputStream(1024); @@ -233,80 +183,85 @@ public class GenericUDFGetSplits extends GenericUDF { throw new HiveException("Need configuration"); } - LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.toString()+"\""); - HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none"); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, LlapOutputFormat.class.toString()); + String fetchTaskConversion = HiveConf.getVar(conf, ConfVars.HIVEFETCHTASKCONVERSION); + String queryResultFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - cpr = driver.compileAndRespond(query); - if(cpr.getResponseCode() != 0) { - throw new HiveException("Failed to compile query: "+cpr.getException()); - } + try { + LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.getName()+"\""); + HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none"); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, LlapOutputFormat.class.getName()); - QueryPlan plan = driver.getPlan(); - List<Task<?>> roots = plan.getRootTasks(); - Schema schema = plan.getResultSchema(); + cpr = driver.compileAndRespond(query); + if(cpr.getResponseCode() != 0) { + throw new HiveException("Failed to compile query: "+cpr.getException()); + } - if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { - throw new HiveException("Was expecting a single TezTask."); - } + QueryPlan plan = driver.getPlan(); + List<Task<?>> roots = plan.getRootTasks(); + Schema schema = plan.getResultSchema(); - Path data = null; - String ifc = null; + if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { + throw new HiveException("Was expecting a single TezTask."); + } - TezWork tezWork = ((TezTask)roots.get(0)).getWork(); + Path data = null; - if (tezWork.getAllWork().size() != 1) { + TezWork tezWork = ((TezTask)roots.get(0)).getWork(); - String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", ""); + if (tezWork.getAllWork().size() != 1) { - String ctas = "create temporary table "+tableName+" as "+query; - LOG.info("CTAS: "+ctas); + String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", ""); - try { - cpr = driver.run(ctas, false); - } catch(CommandNeedRetryException e) { - throw new HiveException(e); - } + String ctas = "create temporary table "+tableName+" as "+query; + LOG.info("CTAS: "+ctas); - if(cpr.getResponseCode() != 0) { - throw new HiveException("Failed to create temp table: " + cpr.getException()); - } + try { + cpr = driver.run(ctas, false); + } catch(CommandNeedRetryException e) { + throw new HiveException(e); + } - query = "select * from " + tableName; - cpr = driver.compileAndRespond(query); - if(cpr.getResponseCode() != 0) { - throw new HiveException("Failed to create temp table: "+cpr.getException()); - } + if(cpr.getResponseCode() != 0) { + throw new HiveException("Failed to create temp table: " + cpr.getException()); + } - plan = driver.getPlan(); - roots = plan.getRootTasks(); - schema = plan.getResultSchema(); + query = "select * from " + tableName; + cpr = driver.compileAndRespond(query); + if(cpr.getResponseCode() != 0) { + throw new HiveException("Failed to create temp table: "+cpr.getException()); + } - if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { - throw new HiveException("Was expecting a single TezTask."); - } + plan = driver.getPlan(); + roots = plan.getRootTasks(); + schema = plan.getResultSchema(); - tezWork = ((TezTask)roots.get(0)).getWork(); - } + if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { + throw new HiveException("Was expecting a single TezTask."); + } - MapWork w = (MapWork)tezWork.getAllWork().get(0); - ifc = LlapInputFormat.class.toString(); + tezWork = ((TezTask)roots.get(0)).getWork(); + } - try { - for (InputSplit s: getSplits(jc, num, tezWork, schema)) { - Object[] os = new Object[3]; - os[0] = ifc; - os[1] = s.getClass().toString(); - bos.reset(); - s.write(dos); - byte[] frozen = bos.toByteArray(); - os[2] = frozen; - retArray.add(os); + MapWork w = (MapWork)tezWork.getAllWork().get(0); + + try { + for (InputSplit s: getSplits(jc, num, tezWork, schema)) { + Object[] os = new Object[3]; + os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME; + os[1] = s.getClass().getName(); + bos.reset(); + s.write(dos); + byte[] frozen = bos.toByteArray(); + os[2] = frozen; + retArray.add(os); + } + } catch(Exception e) { + throw new HiveException(e); } - } catch(Exception e) { - throw new HiveException(e); + } finally { + HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, fetchTaskConversion); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, queryResultFormat); } - return retArray; } @@ -332,6 +287,7 @@ public class GenericUDFGetSplits extends GenericUDF { dag.addVertex(wx); utils.addCredentials(mapWork, dag); + // we have the dag now proceed to get the splits: HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null); Preconditions.checkState(HiveConf.getBoolVar(wxConf, @@ -342,8 +298,8 @@ public class GenericUDFGetSplits extends GenericUDF { List<Event> eventList = splitGenerator.initialize(); // hack - just serializing with kryo for now. This needs to be done properly - InputSplit[] result = new InputSplit[eventList.size()]; - ByteArrayOutputStream bos = new ByteArrayOutputStream(10240); + InputSplit[] result = new InputSplit[eventList.size() - 1]; + DataOutputBuffer dob = new DataOutputBuffer(); InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0); @@ -351,11 +307,25 @@ public class GenericUDFGetSplits extends GenericUDF { Preconditions.checkState(hints.size() == eventList.size() -1); + LOG.error("DBG: NumEvents=" + eventList.size()); + LOG.error("DBG: NumSplits=" + result.length); + + ApplicationId fakeApplicationId = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0); + TaskSpec taskSpec = + new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1, fakeApplicationId); + + SubmitWorkInfo submitWorkInfo = + new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis()); + EventMetaData sourceMetaData = + new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName, + "NULL_VERTEX", null); + EventMetaData destinationMetaInfo = new TaskSpecBuilder().getDestingationMetaData(wx); + LOG.info("DBG: Number of splits: " + (eventList.size() - 1)); - for (int i = 1 ; i < eventList.size() ; i++) { + for (int i = 0; i < eventList.size() - 1; i++) { // Creating the TezEvent here itself, since it's easy to serialize. - Event event = eventList.get(i); - TaskLocationHint hint = hints.get(i-1); + Event event = eventList.get(i + 1); + TaskLocationHint hint = hints.get(i); Set<String> hosts = hint.getHosts(); LOG.info("DBG: Using locations: " + hosts.toString()); if (hosts.size() != 1) { @@ -367,18 +337,17 @@ public class GenericUDFGetSplits extends GenericUDF { for (String host : hosts) { locations[j++] = new SplitLocationInfo(host, false); } + TezEvent tezEvent = new TezEvent(event, sourceMetaData, System.currentTimeMillis()); + tezEvent.setDestinationInfo(destinationMetaInfo); bos.reset(); - Kryo kryo = SerializationUtilities.borrowKryo(); - SerializationUtilities.serializeObjectByKryo(kryo, event, bos); - SerializationUtilities.releaseKryo(kryo); + dob.reset(); + tezEvent.write(dob); - TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1); - ApplicationId fakeApplicationId = ApplicationId.newInstance(new Random().nextInt(), 0); - SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(taskSpec, fakeApplicationId); byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo); - result[i-1] = new LlapInputSplit(submitWorkBytes, bos.toByteArray(), locations, schema); + result[i] = + new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema); } return result; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java index d0c7c5a..5cabb6a 100644 --- a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java +++ b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java @@ -4,6 +4,12 @@ import java.util.ArrayList; import java.util.List; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -11,7 +17,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec; // Proxy class within the tez.api package to access package private methods. public class TaskSpecBuilder { - public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits) { + public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits, ApplicationId appId) { Vertex vertex = dag.getVertex(vertexName); ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor(); List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs = @@ -35,11 +41,22 @@ public class TaskSpecBuilder { outputSpecs.add(outputSpec); } - TaskSpec taskSpec = TaskSpec - .createBaseTaskSpec(dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs, - outputSpecs, null); + TezDAGID dagId = TezDAGID.getInstance(appId, 0); + TezVertexID vertexId = TezVertexID.getInstance(dagId, 0); + TezTaskID taskId = TezTaskID.getInstance(vertexId, 0); + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0); + return new TaskSpec(taskAttemptId, dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs, outputSpecs, null); + } - return taskSpec; + public EventMetaData getDestingationMetaData(Vertex vertex) { + List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs = + vertex.getInputs(); + Preconditions.checkState(inputs.size() == 1); + String inputName = inputs.get(0).getName(); + EventMetaData destMeta = + new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertex.getName(), + inputName, null); + return destMeta; } }