HIVE-13324. LLAP: history log for FRAGMENT_START doesn't log DagId correctly. (Siddharth Seth, Reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3038b05e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3038b05e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3038b05e Branch: refs/heads/llap Commit: 3038b05ed346f4b5438e9072edb19186ea90d042 Parents: 2449d1d Author: Siddharth Seth <ss...@apache.org> Authored: Sat Mar 26 14:12:36 2016 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Sat Mar 26 14:12:36 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/llap/tez/Converters.java | 1 + .../hadoop/hive/llap/tez/TestConverters.java | 190 +++++++++++++++++++ 2 files changed, 191 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3038b05e/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java index a5c3631..ec6e439 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java @@ -85,6 +85,7 @@ public class Converters { FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder(); builder.setFragmentIdentifierString(taskSpec.getTaskAttemptID().toString()); builder.setDagName(taskSpec.getDAGName()); + builder.setDagId(taskSpec.getDagIdentifier()); builder.setVertexName(taskSpec.getVertexName()); builder.setVertexParallelism(taskSpec.getVertexParallelism()); builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId()); http://git-wip-us.apache.org/repos/asf/hive/blob/3038b05e/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java ---------------------------------------------------------------------- diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java new file mode 100644 index 0000000..d4cdac1 --- /dev/null +++ b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java @@ -0,0 +1,190 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tez; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.List; + +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.junit.Test; + +public class TestConverters { + + @Test(timeout = 5000) + public void testTaskSpecToFragmentSpec() { + ByteBuffer procBb = ByteBuffer.allocate(4); + procBb.putInt(0, 200); + UserPayload processorPayload = UserPayload.create(procBb); + ProcessorDescriptor processorDescriptor = + ProcessorDescriptor.create("fakeProcessorName").setUserPayload(processorPayload); + + ByteBuffer input1Bb = ByteBuffer.allocate(4); + input1Bb.putInt(0, 300); + UserPayload input1Payload = UserPayload.create(input1Bb); + InputDescriptor id1 = InputDescriptor.create("input1ClassName").setUserPayload(input1Payload); + InputSpec inputSpec1 = new InputSpec("sourceVertexName1", id1, 33); + InputSpec inputSpec2 = new InputSpec("sourceVertexName2", id1, 44); + List<InputSpec> inputSpecList = Lists.newArrayList(inputSpec1, inputSpec2); + + ByteBuffer output1Bb = ByteBuffer.allocate(4); + output1Bb.putInt(0, 400); + UserPayload output1Payload = UserPayload.create(output1Bb); + OutputDescriptor od1 = + OutputDescriptor.create("output1ClassName").setUserPayload(output1Payload); + OutputSpec outputSpec1 = new OutputSpec("destVertexName1", od1, 55); + OutputSpec outputSpec2 = new OutputSpec("destVertexName2", od1, 66); + List<OutputSpec> outputSpecList = Lists.newArrayList(outputSpec1, outputSpec2); + + ApplicationId appId = ApplicationId.newInstance(1000, 100); + TezDAGID tezDagId = TezDAGID.getInstance(appId, 300); + TezVertexID tezVertexId = TezVertexID.getInstance(tezDagId, 400); + TezTaskID tezTaskId = TezTaskID.getInstance(tezVertexId, 500); + TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tezTaskId, 600); + + TaskSpec taskSpec = + new TaskSpec(tezTaskAttemptId, "dagName", "vertexName", 10, processorDescriptor, + inputSpecList, outputSpecList, null); + + + FragmentSpecProto fragmentSpecProto = Converters.convertTaskSpecToProto(taskSpec); + + + assertEquals("dagName", fragmentSpecProto.getDagName()); + assertEquals("vertexName", fragmentSpecProto.getVertexName()); + assertEquals(tezTaskAttemptId.toString(), fragmentSpecProto.getFragmentIdentifierString()); + assertEquals(tezDagId.getId(), fragmentSpecProto.getDagId()); + assertEquals(tezTaskAttemptId.getId(), fragmentSpecProto.getAttemptNumber()); + assertEquals(tezTaskId.getId(), fragmentSpecProto.getFragmentNumber()); + assertEquals(processorDescriptor.getClassName(), + fragmentSpecProto.getProcessorDescriptor().getClassName()); + assertEquals(processorDescriptor.getUserPayload().getPayload(), + fragmentSpecProto.getProcessorDescriptor().getUserPayload().getUserPayload() + .asReadOnlyByteBuffer()); + assertEquals(2, fragmentSpecProto.getInputSpecsCount()); + assertEquals(2, fragmentSpecProto.getOutputSpecsCount()); + + verifyInputSpecAndProto(inputSpec1, fragmentSpecProto.getInputSpecs(0)); + verifyInputSpecAndProto(inputSpec2, fragmentSpecProto.getInputSpecs(1)); + verifyOutputSpecAndProto(outputSpec1, fragmentSpecProto.getOutputSpecs(0)); + verifyOutputSpecAndProto(outputSpec2, fragmentSpecProto.getOutputSpecs(1)); + + } + + @Test (timeout = 5000) + public void testFragmentSpecToTaskSpec() { + + ByteBuffer procBb = ByteBuffer.allocate(4); + procBb.putInt(0, 200); + + ByteBuffer input1Bb = ByteBuffer.allocate(4); + input1Bb.putInt(0, 300); + + ByteBuffer output1Bb = ByteBuffer.allocate(4); + output1Bb.putInt(0, 400); + + ApplicationId appId = ApplicationId.newInstance(1000, 100); + TezDAGID tezDagId = TezDAGID.getInstance(appId, 300); + TezVertexID tezVertexId = TezVertexID.getInstance(tezDagId, 400); + TezTaskID tezTaskId = TezTaskID.getInstance(tezVertexId, 500); + TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tezTaskId, 600); + + FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder(); + builder.setFragmentIdentifierString(tezTaskAttemptId.toString()); + builder.setDagName("dagName"); + builder.setVertexName("vertexName"); + builder.setDagId(tezDagId.getId()); + builder.setProcessorDescriptor( + EntityDescriptorProto.newBuilder().setClassName("fakeProcessorName").setUserPayload( + UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(procBb)))); + builder.addInputSpecs(IOSpecProto.newBuilder().setConnectedVertexName("sourceVertexName1") + .setPhysicalEdgeCount(33).setIoDescriptor( + EntityDescriptorProto.newBuilder().setClassName("input1ClassName").setUserPayload( + UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(input1Bb))))); + builder.addInputSpecs(IOSpecProto.newBuilder().setConnectedVertexName("sourceVertexName2") + .setPhysicalEdgeCount(44).setIoDescriptor( + EntityDescriptorProto.newBuilder().setClassName("input1ClassName").setUserPayload( + UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(input1Bb))))); + builder.addOutputSpecs(IOSpecProto.newBuilder().setConnectedVertexName("destVertexName1") + .setPhysicalEdgeCount(55).setIoDescriptor( + EntityDescriptorProto.newBuilder().setClassName("outputClassName").setUserPayload( + UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(output1Bb))))); + builder.addOutputSpecs(IOSpecProto.newBuilder().setConnectedVertexName("destVertexName2") + .setPhysicalEdgeCount(66).setIoDescriptor( + EntityDescriptorProto.newBuilder().setClassName("outputClassName").setUserPayload( + UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(output1Bb))))); + + FragmentSpecProto fragmentSpecProto = builder.build(); + + TaskSpec taskSpec = Converters.getTaskSpecfromProto(fragmentSpecProto); + + assertEquals("dagName", taskSpec.getDAGName()); + assertEquals("vertexName", taskSpec.getVertexName()); + assertEquals(tezTaskAttemptId, taskSpec.getTaskAttemptID()); + assertEquals("fakeProcessorName", taskSpec.getProcessorDescriptor().getClassName()); + byte[] serialized = new byte[taskSpec.getProcessorDescriptor().getUserPayload().getPayload().remaining()]; + taskSpec.getProcessorDescriptor().getUserPayload().getPayload().get(serialized); + assertArrayEquals(procBb.array(), serialized); + + assertEquals(2, taskSpec.getInputs().size()); + assertEquals(2, taskSpec.getOutputs().size()); + + verifyInputSpecAndProto(taskSpec.getInputs().get(0), fragmentSpecProto.getInputSpecs(0)); + verifyInputSpecAndProto(taskSpec.getInputs().get(1), fragmentSpecProto.getInputSpecs(1)); + verifyOutputSpecAndProto(taskSpec.getOutputs().get(0), fragmentSpecProto.getOutputSpecs(0)); + verifyOutputSpecAndProto(taskSpec.getOutputs().get(1), fragmentSpecProto.getOutputSpecs(1)); + + + } + + private void verifyInputSpecAndProto(InputSpec inputSpec, + IOSpecProto inputSpecProto) { + assertEquals(inputSpec.getPhysicalEdgeCount(), inputSpecProto.getPhysicalEdgeCount()); + assertEquals(inputSpec.getSourceVertexName(), inputSpecProto.getConnectedVertexName()); + assertEquals(inputSpec.getInputDescriptor().getClassName(), + inputSpecProto.getIoDescriptor().getClassName()); + assertEquals(inputSpec.getInputDescriptor().getUserPayload().getPayload(), + inputSpecProto.getIoDescriptor().getUserPayload().getUserPayload().asReadOnlyByteBuffer()); + } + + private void verifyOutputSpecAndProto(OutputSpec outputSpec, + IOSpecProto outputSpecProto) { + assertEquals(outputSpec.getPhysicalEdgeCount(), outputSpecProto.getPhysicalEdgeCount()); + assertEquals(outputSpec.getDestinationVertexName(), outputSpecProto.getConnectedVertexName()); + assertEquals(outputSpec.getOutputDescriptor().getClassName(), + outputSpecProto.getIoDescriptor().getClassName()); + assertEquals(outputSpec.getOutputDescriptor().getUserPayload().getPayload(), + outputSpecProto.getIoDescriptor().getUserPayload().getUserPayload().asReadOnlyByteBuffer()); + } +}