http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java deleted file mode 100644 index c133070..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ /dev/null @@ -1,265 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.yarn.api.ApplicationClientProtocol; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; -import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; -import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; -import org.apache.hadoop.yarn.ipc.RPCUtil; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.util.Records; -import org.junit.Assert; -import org.junit.Test; - -public class TestRPC { - - private static final String EXCEPTION_MSG = "test error"; - private static final String EXCEPTION_CAUSE = "exception cause"; - private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - - @Test - public void testUnknownCall() { - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class - .getName()); - YarnRPC rpc = YarnRPC.create(conf); - String bindAddr = "localhost:0"; - InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); - Server server = rpc.getServer(ContainerManagementProtocol.class, - new DummyContainerManager(), addr, conf, null, 1); - server.start(); - - // Any unrelated protocol would do - ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy( - ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf); - - try { - proxy.getNewApplication(Records - .newRecord(GetNewApplicationRequest.class)); - Assert.fail("Excepted RPC call to fail with unknown method."); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().matches( - "Unknown method getNewApplication called on.*" - + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol" - + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol.")); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Test - public void testHadoopProtoRPC() throws Exception { - test(HadoopYarnProtoRPC.class.getName()); - } - - private void test(String rpcClass) throws Exception { - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass); - YarnRPC rpc = YarnRPC.create(conf); - String bindAddr = "localhost:0"; - InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); - Server server = rpc.getServer(ContainerManagementProtocol.class, - new DummyContainerManager(), addr, conf, null, 1); - server.start(); - RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class); - ContainerManagementProtocol proxy = (ContainerManagementProtocol) - rpc.getProxy(ContainerManagementProtocol.class, - NetUtils.getConnectAddress(server), conf); - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - ApplicationId applicationId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId applicationAttemptId = - ApplicationAttemptId.newInstance(applicationId, 0); - ContainerId containerId = - ContainerId.newContainerId(applicationAttemptId, 100); - NodeId nodeId = NodeId.newInstance("localhost", 1234); - Resource resource = Resource.newInstance(1234, 2); - ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier(containerId, "localhost", "user", - resource, System.currentTimeMillis() + 10000, 42, 42, - Priority.newInstance(0), 0); - Token containerToken = newContainerToken(nodeId, "password".getBytes(), - containerTokenIdentifier); - - StartContainerRequest scRequest = - StartContainerRequest.newInstance(containerLaunchContext, - containerToken); - List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); - list.add(scRequest); - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - proxy.startContainers(allRequests); - - List<ContainerId> containerIds = new ArrayList<ContainerId>(); - containerIds.add(containerId); - GetContainerStatusesRequest gcsRequest = - GetContainerStatusesRequest.newInstance(containerIds); - GetContainerStatusesResponse response = - proxy.getContainerStatuses(gcsRequest); - List<ContainerStatus> statuses = response.getContainerStatuses(); - - //test remote exception - boolean exception = false; - try { - StopContainersRequest stopRequest = - recordFactory.newRecordInstance(StopContainersRequest.class); - stopRequest.setContainerIds(containerIds); - proxy.stopContainers(stopRequest); - } catch (YarnException e) { - exception = true; - Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG)); - Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE)); - System.out.println("Test Exception is " + e.getMessage()); - } catch (Exception ex) { - ex.printStackTrace(); - } - Assert.assertTrue(exception); - - server.stop(); - Assert.assertNotNull(statuses.get(0)); - Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState()); - } - - public class DummyContainerManager implements ContainerManagementProtocol { - - private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>(); - - @Override - public GetContainerStatusesResponse getContainerStatuses( - GetContainerStatusesRequest request) - throws YarnException { - GetContainerStatusesResponse response = - recordFactory.newRecordInstance(GetContainerStatusesResponse.class); - response.setContainerStatuses(statuses); - return response; - } - - @Override - public StartContainersResponse startContainers( - StartContainersRequest requests) throws YarnException { - StartContainersResponse response = - recordFactory.newRecordInstance(StartContainersResponse.class); - for (StartContainerRequest request : requests.getStartContainerRequests()) { - Token containerToken = request.getContainerToken(); - ContainerTokenIdentifier tokenId = null; - - try { - tokenId = newContainerTokenIdentifier(containerToken); - } catch (IOException e) { - throw RPCUtil.getRemoteException(e); - } - ContainerStatus status = - recordFactory.newRecordInstance(ContainerStatus.class); - status.setState(ContainerState.RUNNING); - status.setContainerId(tokenId.getContainerID()); - status.setExitStatus(0); - statuses.add(status); - - } - return response; - } - - @Override - public StopContainersResponse stopContainers(StopContainersRequest request) - throws YarnException { - Exception e = new Exception(EXCEPTION_MSG, - new Exception(EXCEPTION_CAUSE)); - throw new YarnException(e); - } - - @Override - public IncreaseContainersResourceResponse increaseContainersResource( - IncreaseContainersResourceRequest request) throws YarnException, IOException { - return null; - } - - @Override - public SignalContainerResponse signalToContainer( - SignalContainerRequest request) throws YarnException { - final Exception e = new Exception(EXCEPTION_MSG, - new Exception(EXCEPTION_CAUSE)); - throw new YarnException(e); - } - } - - public static ContainerTokenIdentifier newContainerTokenIdentifier( - Token containerToken) throws IOException { - org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token = - new org.apache.hadoop.security.token.Token<ContainerTokenIdentifier>( - containerToken.getIdentifier() - .array(), containerToken.getPassword().array(), new Text( - containerToken.getKind()), - new Text(containerToken.getService())); - return token.decodeIdentifier(); - } - - public static Token newContainerToken(NodeId nodeId, byte[] password, - ContainerTokenIdentifier tokenIdentifier) { - // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = - NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); - // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token - Token containerToken = - Token.newInstance(tokenIdentifier.getBytes(), - ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil - .buildTokenService(addr).toString()); - return containerToken; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java new file mode 100644 index 0000000..221969b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.Test; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + + +public class TestTimelineServiceRecords { + private static final Log LOG = + LogFactory.getLog(TestTimelineServiceRecords.class); + + @Test + public void testTimelineEntities() throws Exception { + TimelineEntity entity = new TimelineEntity(); + entity.setType("test type 1"); + entity.setId("test id 1"); + entity.addInfo("test info key 1", "test info value 1"); + entity.addInfo("test info key 2", + Arrays.asList("test info value 2", "test info value 3")); + entity.addInfo("test info key 3", true); + Assert.assertTrue( + entity.getInfo().get("test info key 3") instanceof Boolean); + entity.addConfig("test config key 1", "test config value 1"); + entity.addConfig("test config key 2", "test config value 2"); + + TimelineMetric metric1 = + new TimelineMetric(TimelineMetric.Type.TIME_SERIES); + metric1.setId("test metric id 1"); + metric1.addValue(1L, 1.0F); + metric1.addValue(3L, 3.0D); + metric1.addValue(2L, 2); + Assert.assertEquals(TimelineMetric.Type.TIME_SERIES, metric1.getType()); + Iterator<Map.Entry<Long, Number>> itr = + metric1.getValues().entrySet().iterator(); + Map.Entry<Long, Number> entry = itr.next(); + Assert.assertEquals(new Long(3L), entry.getKey()); + Assert.assertEquals(3.0D, entry.getValue()); + entry = itr.next(); + Assert.assertEquals(new Long(2L), entry.getKey()); + Assert.assertEquals(2, entry.getValue()); + entry = itr.next(); + Assert.assertEquals(new Long(1L), entry.getKey()); + Assert.assertEquals(1.0F, entry.getValue()); + Assert.assertFalse(itr.hasNext()); + entity.addMetric(metric1); + + TimelineMetric metric2 = + new TimelineMetric(TimelineMetric.Type.SINGLE_VALUE); + metric2.setId("test metric id 1"); + metric2.addValue(3L, (short) 3); + Assert.assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric2.getType()); + Assert.assertTrue( + metric2.getValues().values().iterator().next() instanceof Short); + Map<Long, Number> points = new HashMap<>(); + points.put(4L, 4.0D); + points.put(5L, 5.0D); + try { + metric2.setValues(points); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + "Values cannot contain more than one point in")); + } + try { + metric2.addValues(points); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + "Values cannot contain more than one point in")); + } + entity.addMetric(metric2); + + TimelineMetric metric3 = + new TimelineMetric(TimelineMetric.Type.SINGLE_VALUE); + metric3.setId("test metric id 1"); + metric3.addValue(4L, (short) 4); + Assert.assertEquals("metric3 should equal to metric2! ", metric3, metric2); + Assert.assertNotEquals("metric1 should not equal to metric2! ", + metric1, metric2); + + TimelineEvent event1 = new TimelineEvent(); + event1.setId("test event id 1"); + event1.addInfo("test info key 1", "test info value 1"); + event1.addInfo("test info key 2", + Arrays.asList("test info value 2", "test info value 3")); + event1.addInfo("test info key 3", true); + Assert.assertTrue( + event1.getInfo().get("test info key 3") instanceof Boolean); + event1.setTimestamp(1L); + entity.addEvent(event1); + + TimelineEvent event2 = new TimelineEvent(); + event2.setId("test event id 2"); + event2.addInfo("test info key 1", "test info value 1"); + event2.addInfo("test info key 2", + Arrays.asList("test info value 2", "test info value 3")); + event2.addInfo("test info key 3", true); + Assert.assertTrue( + event2.getInfo().get("test info key 3") instanceof Boolean); + event2.setTimestamp(2L); + entity.addEvent(event2); + + Assert.assertFalse("event1 should not equal to event2! ", + event1.equals(event2)); + TimelineEvent event3 = new TimelineEvent(); + event3.setId("test event id 1"); + event3.setTimestamp(1L); + Assert.assertEquals("event1 should equal to event3! ", event3, event1); + Assert.assertNotEquals("event1 should not equal to event2! ", + event1, event2); + + entity.setCreatedTime(0L); + entity.addRelatesToEntity("test type 2", "test id 2"); + entity.addRelatesToEntity("test type 3", "test id 3"); + entity.addIsRelatedToEntity("test type 4", "test id 4"); + entity.addIsRelatedToEntity("test type 5", "test id 5"); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entity, true)); + + TimelineEntities entities = new TimelineEntities(); + TimelineEntity entity1 = new TimelineEntity(); + entities.addEntity(entity1); + TimelineEntity entity2 = new TimelineEntity(); + entities.addEntity(entity2); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true)); + + Assert.assertFalse("entity 1 should not be valid without type and id", + entity1.isValid()); + entity1.setId("test id 2"); + entity1.setType("test type 2"); + entity2.setId("test id 1"); + entity2.setType("test type 1"); + + Assert.assertEquals("Timeline entity should equal to entity2! ", + entity, entity2); + Assert.assertNotEquals("entity1 should not equal to entity! ", + entity1, entity); + Assert.assertEquals("entity should be less than entity1! ", + entity1.compareTo(entity), 1); + Assert.assertEquals("entity's hash code should be -28727840 but not " + + entity.hashCode(), entity.hashCode(), -28727840); + } + + @Test + public void testFirstClassCitizenEntities() throws Exception { + UserEntity user = new UserEntity(); + user.setId("test user id"); + + QueueEntity queue = new QueueEntity(); + queue.setId("test queue id"); + + + ClusterEntity cluster = new ClusterEntity(); + cluster.setId("test cluster id"); + + FlowRunEntity flow1 = new FlowRunEntity(); + //flow1.setId("test flow id 1"); + flow1.setUser(user.getId()); + flow1.setName("test flow name 1"); + flow1.setVersion("test flow version 1"); + flow1.setRunId(1L); + + FlowRunEntity flow2 = new FlowRunEntity(); + //flow2.setId("test flow run id 2"); + flow2.setUser(user.getId()); + flow2.setName("test flow name 2"); + flow2.setVersion("test flow version 2"); + flow2.setRunId(2L); + + ApplicationEntity app1 = new ApplicationEntity(); + app1.setId(ApplicationId.newInstance(0, 1).toString()); + app1.setQueue(queue.getId()); + + ApplicationEntity app2 = new ApplicationEntity(); + app2.setId(ApplicationId.newInstance(0, 2).toString()); + app2.setQueue(queue.getId()); + + ApplicationAttemptEntity appAttempt = new ApplicationAttemptEntity(); + appAttempt.setId(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1).toString()); + + ContainerEntity container = new ContainerEntity(); + container.setId(ContainerId.newContainerId( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1).toString()); + + cluster.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), + flow1.getId()); + flow1 + .setParent(TimelineEntityType.YARN_CLUSTER.toString(), cluster.getId()); + flow1.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId()); + flow2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId()); + flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), + app1.getId()); + flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), + app2.getId()); + app1.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId()); + app1.addChild(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), + appAttempt.getId()); + appAttempt + .setParent(TimelineEntityType.YARN_APPLICATION.toString(), + app1.getId()); + app2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId()); + appAttempt.addChild(TimelineEntityType.YARN_CONTAINER.toString(), + container.getId()); + container.setParent(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), + appAttempt.getId()); + + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(cluster, true)); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(flow1, true)); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(flow2, true)); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(app1, true)); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(app2, true)); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(appAttempt, true)); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(container, true)); + + + // Check parent/children APIs + Assert.assertNotNull(app1.getParent()); + Assert.assertEquals(flow2.getType(), app1.getParent().getType()); + Assert.assertEquals(flow2.getId(), app1.getParent().getId()); + app1.addInfo(ApplicationEntity.PARENT_INFO_KEY, "invalid parent object"); + try { + app1.getParent(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e instanceof YarnRuntimeException); + Assert.assertTrue(e.getMessage().contains( + "Parent info is invalid identifier object")); + } + + Assert.assertNotNull(app1.getChildren()); + Assert.assertEquals(1, app1.getChildren().size()); + Assert.assertEquals( + appAttempt.getType(), app1.getChildren().iterator().next().getType()); + Assert.assertEquals( + appAttempt.getId(), app1.getChildren().iterator().next().getId()); + app1.addInfo(ApplicationEntity.CHILDREN_INFO_KEY, + Collections.singletonList("invalid children set")); + try { + app1.getChildren(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e instanceof YarnRuntimeException); + Assert.assertTrue(e.getMessage().contains( + "Children info is invalid identifier set")); + } + app1.addInfo(ApplicationEntity.CHILDREN_INFO_KEY, + Collections.singleton("invalid child object")); + try { + app1.getChildren(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e instanceof YarnRuntimeException); + Assert.assertTrue(e.getMessage().contains( + "Children info contains invalid identifier object")); + } + } + + @Test + public void testUser() throws Exception { + UserEntity user = new UserEntity(); + user.setId("test user id"); + user.addInfo("test info key 1", "test info value 1"); + user.addInfo("test info key 2", "test info value 2"); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(user, true)); + } + + @Test + public void testQueue() throws Exception { + QueueEntity queue = new QueueEntity(); + queue.setId("test queue id"); + queue.addInfo("test info key 1", "test info value 1"); + queue.addInfo("test info key 2", "test info value 2"); + queue.setParent(TimelineEntityType.YARN_QUEUE.toString(), + "test parent queue id"); + queue.addChild(TimelineEntityType.YARN_QUEUE.toString(), + "test child queue id 1"); + queue.addChild(TimelineEntityType.YARN_QUEUE.toString(), + "test child queue id 2"); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(queue, true)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java new file mode 100644 index 0000000..5813340 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import javax.ws.rs.core.MultivaluedMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class TestTimelineClientV2Impl { + private static final Log LOG = + LogFactory.getLog(TestTimelineClientV2Impl.class); + private TestV2TimelineClient client; + private static final long TIME_TO_SLEEP = 150L; + private static final String EXCEPTION_MSG = "Exception in the content"; + + @Before + public void setup() { + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); + conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3); + if (!currTestName.getMethodName() + .contains("testRetryOnConnectionFailure")) { + client = createTimelineClient(conf); + } + } + + @Rule + public TestName currTestName = new TestName(); + private YarnConfiguration conf; + + private TestV2TimelineClient createTimelineClient(YarnConfiguration config) { + ApplicationId id = ApplicationId.newInstance(0, 0); + TestV2TimelineClient tc = new TestV2TimelineClient(id); + tc.init(config); + tc.start(); + return tc; + } + + private class TestV2TimelineClientForExceptionHandling + extends TimelineClientImpl { + public TestV2TimelineClientForExceptionHandling(ApplicationId id) { + super(id); + } + + private boolean throwYarnException; + + public void setThrowYarnException(boolean throwYarnException) { + this.throwYarnException = throwYarnException; + } + + public boolean isThrowYarnException() { + return throwYarnException; + } + + @Override + protected void putObjects(URI base, String path, + MultivaluedMap<String, String> params, Object obj) + throws IOException, YarnException { + if (throwYarnException) { + throw new YarnException(EXCEPTION_MSG); + } else { + throw new IOException( + "Failed to get the response from the timeline server."); + } + } + } + + private class TestV2TimelineClient + extends TestV2TimelineClientForExceptionHandling { + private boolean sleepBeforeReturn; + + private List<TimelineEntities> publishedEntities; + + public TimelineEntities getPublishedEntities(int putIndex) { + Assert.assertTrue("Not So many entities Published", + putIndex < publishedEntities.size()); + return publishedEntities.get(putIndex); + } + + public void setSleepBeforeReturn(boolean sleepBeforeReturn) { + this.sleepBeforeReturn = sleepBeforeReturn; + } + + public int getNumOfTimelineEntitiesPublished() { + return publishedEntities.size(); + } + + public TestV2TimelineClient(ApplicationId id) { + super(id); + publishedEntities = new ArrayList<TimelineEntities>(); + } + + protected void putObjects(String path, + MultivaluedMap<String, String> params, Object obj) + throws IOException, YarnException { + if (isThrowYarnException()) { + throw new YarnException("ActualException"); + } + publishedEntities.add((TimelineEntities) obj); + if (sleepBeforeReturn) { + try { + Thread.sleep(TIME_TO_SLEEP); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + @Test + public void testExceptionMultipleRetry() { + TestV2TimelineClientForExceptionHandling c = + new TestV2TimelineClientForExceptionHandling( + ApplicationId.newInstance(0, 0)); + int maxRetries = 2; + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + maxRetries); + c.init(conf); + c.start(); + c.setTimelineServiceAddress("localhost:12345"); + try { + c.putEntities(new TimelineEntity()); + } catch (IOException e) { + Assert.fail("YARN exception is expected"); + } catch (YarnException e) { + Throwable cause = e.getCause(); + Assert.assertTrue("IOException is expected", + cause instanceof IOException); + Assert.assertTrue("YARN exception is expected", + cause.getMessage().contains( + "TimelineClient has reached to max retry times : " + maxRetries)); + } + + c.setThrowYarnException(true); + try { + c.putEntities(new TimelineEntity()); + } catch (IOException e) { + Assert.fail("YARN exception is expected"); + } catch (YarnException e) { + Throwable cause = e.getCause(); + Assert.assertTrue("YARN exception is expected", + cause instanceof YarnException); + Assert.assertTrue("YARN exception is expected", + cause.getMessage().contains(EXCEPTION_MSG)); + } + c.stop(); + } + + @Test + public void testPostEntities() throws Exception { + try { + client.putEntities(generateEntity("1")); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testASyncCallMerge() throws Exception { + client.setSleepBeforeReturn(true); + try { + client.putEntitiesAsync(generateEntity("1")); + Thread.sleep(TIME_TO_SLEEP / 2); + // by the time first put response comes push 2 entities in the queue + client.putEntitiesAsync(generateEntity("2")); + client.putEntitiesAsync(generateEntity("3")); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + for (int i = 0; i < 4; i++) { + if (client.getNumOfTimelineEntitiesPublished() == 2) { + break; + } + Thread.sleep(TIME_TO_SLEEP); + } + Assert.assertEquals("two merged TimelineEntities needs to be published", 2, + client.getNumOfTimelineEntitiesPublished()); + TimelineEntities secondPublishedEntities = client.getPublishedEntities(1); + Assert.assertEquals( + "Merged TimelineEntities Object needs to 2 TimelineEntity Object", 2, + secondPublishedEntities.getEntities().size()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "2", + secondPublishedEntities.getEntities().get(0).getId()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "3", + secondPublishedEntities.getEntities().get(1).getId()); + } + + @Test + public void testSyncCall() throws Exception { + try { + // sync entity should not be be merged with Async + client.putEntities(generateEntity("1")); + client.putEntitiesAsync(generateEntity("2")); + client.putEntitiesAsync(generateEntity("3")); + // except for the sync call above 2 should be merged + client.putEntities(generateEntity("4")); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + for (int i = 0; i < 4; i++) { + if (client.getNumOfTimelineEntitiesPublished() == 3) { + break; + } + Thread.sleep(TIME_TO_SLEEP); + } + printReceivedEntities(); + Assert.assertEquals("TimelineEntities not published as desired", 3, + client.getNumOfTimelineEntitiesPublished()); + TimelineEntities firstPublishedEntities = client.getPublishedEntities(0); + Assert.assertEquals("sync entities should not be merged with async", 1, + firstPublishedEntities.getEntities().size()); + + // test before pushing the sync entities asyncs are merged and pushed + TimelineEntities secondPublishedEntities = client.getPublishedEntities(1); + Assert.assertEquals( + "async entities should be merged before publishing sync", 2, + secondPublishedEntities.getEntities().size()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "2", + secondPublishedEntities.getEntities().get(0).getId()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "3", + secondPublishedEntities.getEntities().get(1).getId()); + + // test the last entity published is sync put + TimelineEntities thirdPublishedEntities = client.getPublishedEntities(2); + Assert.assertEquals("sync entities had to be published at the last", 1, + thirdPublishedEntities.getEntities().size()); + Assert.assertEquals("Expected last sync Event is not proper", "4", + thirdPublishedEntities.getEntities().get(0).getId()); + } + + @Test + public void testExceptionCalls() throws Exception { + client.setThrowYarnException(true); + try { + client.putEntitiesAsync(generateEntity("1")); + } catch (YarnException e) { + Assert.fail("Async calls are not expected to throw exception"); + } + + try { + client.putEntities(generateEntity("2")); + Assert.fail("Sync calls are expected to throw exception"); + } catch (YarnException e) { + Assert.assertEquals("Same exception needs to be thrown", + "ActualException", e.getCause().getMessage()); + } + } + + @Test + public void testConfigurableNumberOfMerges() throws Exception { + client.setSleepBeforeReturn(true); + try { + // At max 3 entities need to be merged + client.putEntitiesAsync(generateEntity("1")); + client.putEntitiesAsync(generateEntity("2")); + client.putEntitiesAsync(generateEntity("3")); + client.putEntitiesAsync(generateEntity("4")); + client.putEntities(generateEntity("5")); + client.putEntitiesAsync(generateEntity("6")); + client.putEntitiesAsync(generateEntity("7")); + client.putEntitiesAsync(generateEntity("8")); + client.putEntitiesAsync(generateEntity("9")); + client.putEntitiesAsync(generateEntity("10")); + } catch (YarnException e) { + Assert.fail("No exception expected"); + } + // not having the same logic here as it doesn't depend on how many times + // events are published. + Thread.sleep(2 * TIME_TO_SLEEP); + printReceivedEntities(); + for (TimelineEntities publishedEntities : client.publishedEntities) { + Assert.assertTrue( + "Number of entities should not be greater than 3 for each publish," + + " but was " + publishedEntities.getEntities().size(), + publishedEntities.getEntities().size() <= 3); + } + } + + @Test + public void testAfterStop() throws Exception { + client.setSleepBeforeReturn(true); + try { + // At max 3 entities need to be merged + client.putEntities(generateEntity("1")); + for (int i = 2; i < 20; i++) { + client.putEntitiesAsync(generateEntity("" + i)); + } + client.stop(); + try { + client.putEntitiesAsync(generateEntity("50")); + Assert.fail("Exception expected"); + } catch (YarnException e) { + // expected + } + } catch (YarnException e) { + Assert.fail("No exception expected"); + } + // not having the same logic here as it doesn't depend on how many times + // events are published. + for (int i = 0; i < 5; i++) { + TimelineEntities publishedEntities = + client.publishedEntities.get(client.publishedEntities.size() - 1); + TimelineEntity timelineEntity = publishedEntities.getEntities() + .get(publishedEntities.getEntities().size() - 1); + if (!timelineEntity.getId().equals("19")) { + Thread.sleep(2 * TIME_TO_SLEEP); + } + } + printReceivedEntities(); + TimelineEntities publishedEntities = + client.publishedEntities.get(client.publishedEntities.size() - 1); + TimelineEntity timelineEntity = publishedEntities.getEntities() + .get(publishedEntities.getEntities().size() - 1); + Assert.assertEquals("", "19", timelineEntity.getId()); + } + + private void printReceivedEntities() { + for (int i = 0; i < client.getNumOfTimelineEntitiesPublished(); i++) { + TimelineEntities publishedEntities = client.getPublishedEntities(i); + StringBuilder entitiesPerPublish = new StringBuilder(); + for (TimelineEntity entity : publishedEntities.getEntities()) { + entitiesPerPublish.append(entity.getId()); + entitiesPerPublish.append(","); + } + LOG.info("Entities Published @ index " + i + " : " + + entitiesPerPublish.toString()); + } + } + + private static TimelineEntity generateEntity(String id) { + TimelineEntity entity = new TimelineEntity(); + entity.setId(id); + entity.setType("testEntity"); + entity.setCreatedTime(System.currentTimeMillis()); + return entity; + } + + @After + public void tearDown() { + if (client != null) { + client.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestTimelineServiceHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestTimelineServiceHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestTimelineServiceHelper.java new file mode 100644 index 0000000..d3d815b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestTimelineServiceHelper.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.junit.Assert; +import org.junit.Test; + +public class TestTimelineServiceHelper { + + @Test + public void testMapCastToHashMap() { + + // Test null map be casted to null + Map<String, String> nullMap = null; + Assert.assertNull(TimelineServiceHelper.mapCastToHashMap(nullMap)); + + // Test empty hashmap be casted to a empty hashmap + Map<String, String> emptyHashMap = new HashMap<String, String>(); + Assert.assertEquals( + TimelineServiceHelper.mapCastToHashMap(emptyHashMap).size(), 0); + + // Test empty non-hashmap be casted to a empty hashmap + Map<String, String> emptyTreeMap = new TreeMap<String, String>(); + Assert.assertEquals( + TimelineServiceHelper.mapCastToHashMap(emptyTreeMap).size(), 0); + + // Test non-empty hashmap be casted to hashmap correctly + Map<String, String> firstHashMap = new HashMap<String, String>(); + String key = "KEY"; + String value = "VALUE"; + firstHashMap.put(key, value); + Assert.assertEquals( + TimelineServiceHelper.mapCastToHashMap(firstHashMap), firstHashMap); + + // Test non-empty non-hashmap is casted correctly. + Map<String, String> firstTreeMap = new TreeMap<String, String>(); + firstTreeMap.put(key, value); + HashMap<String, String> alternateHashMap = + TimelineServiceHelper.mapCastToHashMap(firstTreeMap); + Assert.assertEquals(firstTreeMap.size(), alternateHashMap.size()); + Assert.assertEquals(alternateHashMap.get(key), value); + + // Test complicated hashmap be casted correctly + Map<String, Set<String>> complicatedHashMap = + new HashMap<String, Set<String>>(); + Set<String> hashSet = new HashSet<String>(); + hashSet.add(value); + complicatedHashMap.put(key, hashSet); + Assert.assertEquals( + TimelineServiceHelper.mapCastToHashMap(complicatedHashMap), + complicatedHashMap); + + // Test complicated non-hashmap get casted correctly + Map<String, Set<String>> complicatedTreeMap = + new TreeMap<String, Set<String>>(); + complicatedTreeMap.put(key, hashSet); + Assert.assertEquals( + TimelineServiceHelper.mapCastToHashMap(complicatedTreeMap).get(key), + hashSet); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 8a17000..4b4c11d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -150,6 +150,7 @@ <include>yarn_server_common_service_protos.proto</include> <include>ResourceTracker.proto</include> <include>SCMUploader.proto</include> + <include>collectornodemanager_protocol.proto</include> </includes> </source> <output>${project.build.directory}/generated-sources/java</output> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java new file mode 100644 index 0000000..64eea63 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; + +/** + * <p>The protocol between an <code>TimelineCollectorManager</code> and a + * <code>NodeManager</code> to report a new application collector get launched. + * </p> + * + */ +@Private +public interface CollectorNodemanagerProtocol { + + /** + * + * <p> + * The <code>TimelineCollectorManager</code> provides a list of mapping + * between application and collector's address in + * {@link ReportNewCollectorInfoRequest} to a <code>NodeManager</code> to + * <em>register</em> collector's info, include: applicationId and REST URI to + * access collector. NodeManager will add them into registered collectors + * and register them into <code>ResourceManager</code> afterwards. + * </p> + * + * @param request the request of registering a new collector or a list of + * collectors + * @return the response for registering the new collector + * @throws YarnException if the request is invalid + * @throws IOException if there are I/O errors + */ + ReportNewCollectorInfoResponse reportNewCollectorInfo( + ReportNewCollectorInfoRequest request) + throws YarnException, IOException; + + /** + * <p> + * The collector needs to get the context information including user, flow + * and flow run ID to associate with every incoming put-entity requests. + * </p> + * @param request the request of getting the aggregator context information of + * the given application + * @return the response for registering the new collector + * @throws YarnException if the request is invalid + * @throws IOException if there are I/O errors + */ + GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java new file mode 100644 index 0000000..24f7c3d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.CollectorNodemanagerProtocol.CollectorNodemanagerProtocolService; + +@Private +@Unstable +@ProtocolInfo( + protocolName = + "org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB", + protocolVersion = 1) +public interface CollectorNodemanagerProtocolPB extends + CollectorNodemanagerProtocolService.BlockingInterface { + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java new file mode 100644 index 0000000..bc50ac5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl; + +import com.google.protobuf.ServiceException; + +public class CollectorNodemanagerProtocolPBClientImpl implements + CollectorNodemanagerProtocol, Closeable { + + // Not a documented config. Only used for tests internally + static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX + + "rpc.nm-command-timeout"; + + /** + * Maximum of 1 minute timeout for a Node to react to the command. + */ + static final int DEFAULT_COMMAND_TIMEOUT = 60000; + + private CollectorNodemanagerProtocolPB proxy; + + @Private + public CollectorNodemanagerProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, CollectorNodemanagerProtocolPB.class, + ProtobufRpcEngine.class); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT); + proxy = + (CollectorNodemanagerProtocolPB) RPC.getProxy( + CollectorNodemanagerProtocolPB.class, + clientVersion, addr, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), expireIntvl); + } + + @Override + public ReportNewCollectorInfoResponse reportNewCollectorInfo( + ReportNewCollectorInfoRequest request) throws YarnException, IOException { + + ReportNewCollectorInfoRequestProto requestProto = + ((ReportNewCollectorInfoRequestPBImpl) request).getProto(); + try { + return new ReportNewCollectorInfoResponsePBImpl( + proxy.reportNewCollectorInfo(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException { + GetTimelineCollectorContextRequestProto requestProto = + ((GetTimelineCollectorContextRequestPBImpl) request).getProto(); + try { + return new GetTimelineCollectorContextResponsePBImpl( + proxy.getTimelineCollectorContext(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java new file mode 100644 index 0000000..7b93a68 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.impl.pb.service; + +import java.io.IOException; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class CollectorNodemanagerProtocolPBServiceImpl implements + CollectorNodemanagerProtocolPB { + + private CollectorNodemanagerProtocol real; + + public CollectorNodemanagerProtocolPBServiceImpl( + CollectorNodemanagerProtocol impl) { + this.real = impl; + } + + @Override + public ReportNewCollectorInfoResponseProto reportNewCollectorInfo( + RpcController arg0, ReportNewCollectorInfoRequestProto proto) + throws ServiceException { + ReportNewCollectorInfoRequestPBImpl request = + new ReportNewCollectorInfoRequestPBImpl(proto); + try { + ReportNewCollectorInfoResponse response = + real.reportNewCollectorInfo(request); + return ((ReportNewCollectorInfoResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetTimelineCollectorContextResponseProto getTimelineCollectorContext( + RpcController controller, + GetTimelineCollectorContextRequestProto proto) throws ServiceException { + GetTimelineCollectorContextRequestPBImpl request = + new GetTimelineCollectorContextRequestPBImpl(proto); + try { + GetTimelineCollectorContextResponse response = + real.getTimelineCollectorContext(request); + return ((GetTimelineCollectorContextResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java new file mode 100644 index 0000000..604a40b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + +public abstract class GetTimelineCollectorContextRequest { + + public static GetTimelineCollectorContextRequest newInstance( + ApplicationId appId) { + GetTimelineCollectorContextRequest request = + Records.newRecord(GetTimelineCollectorContextRequest.class); + request.setApplicationId(appId); + return request; + } + + public abstract ApplicationId getApplicationId(); + + public abstract void setApplicationId(ApplicationId appId); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java new file mode 100644 index 0000000..bd5c11e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + + +import org.apache.hadoop.yarn.util.Records; + +public abstract class GetTimelineCollectorContextResponse { + + public static GetTimelineCollectorContextResponse newInstance( + String userId, String flowName, String flowVersion, long flowRunId) { + GetTimelineCollectorContextResponse response = + Records.newRecord(GetTimelineCollectorContextResponse.class); + response.setUserId(userId); + response.setFlowName(flowName); + response.setFlowVersion(flowVersion); + response.setFlowRunId(flowRunId); + return response; + } + + public abstract String getUserId(); + + public abstract void setUserId(String userId); + + public abstract String getFlowName(); + + public abstract void setFlowName(String flowName); + + public abstract String getFlowVersion(); + + public abstract void setFlowVersion(String flowVersion); + + public abstract long getFlowRunId(); + + public abstract void setFlowRunId(long flowRunId); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 84ca8a4..c795e55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.List; +import java.util.Map; import java.util.Set; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -42,6 +44,22 @@ public abstract class NodeHeartbeatRequest { return nodeHeartbeatRequest; } + public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, + MasterKey lastKnownContainerTokenMasterKey, + MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels, + Map<ApplicationId, String> registeredCollectors) { + NodeHeartbeatRequest nodeHeartbeatRequest = + Records.newRecord(NodeHeartbeatRequest.class); + nodeHeartbeatRequest.setNodeStatus(nodeStatus); + nodeHeartbeatRequest + .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); + nodeHeartbeatRequest + .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); + nodeHeartbeatRequest.setNodeLabels(nodeLabels); + nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors); + return nodeHeartbeatRequest; + } + public abstract NodeStatus getNodeStatus(); public abstract void setNodeStatus(NodeStatus status); @@ -59,4 +77,9 @@ public abstract class NodeHeartbeatRequest { public abstract void setLogAggregationReportsForApps( List<LogAggregationReport> logAggregationReportsForApps); + + // This tells RM registered collectors' address info on this node + public abstract Map<ApplicationId, String> getRegisteredCollectors(); + public abstract void setRegisteredCollectors(Map<ApplicationId, + String> appCollectorsMap); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 386353a..09cafaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -41,6 +41,10 @@ public interface NodeHeartbeatResponse { List<ApplicationId> getApplicationsToCleanup(); + // This tells NM the collectors' address info of related apps + Map<ApplicationId, String> getAppCollectorsMap(); + void setAppCollectorsMap(Map<ApplicationId, String> appCollectorsMap); + void setResponseId(int responseId); void setNodeAction(NodeAction action); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java new file mode 100644 index 0000000..3498de9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.List; +import java.util.Arrays; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.util.Records; + +@Private +public abstract class ReportNewCollectorInfoRequest { + + public static ReportNewCollectorInfoRequest newInstance( + List<AppCollectorsMap> appCollectorsList) { + ReportNewCollectorInfoRequest request = + Records.newRecord(ReportNewCollectorInfoRequest.class); + request.setAppCollectorsList(appCollectorsList); + return request; + } + + public static ReportNewCollectorInfoRequest newInstance( + ApplicationId id, String collectorAddr) { + ReportNewCollectorInfoRequest request = + Records.newRecord(ReportNewCollectorInfoRequest.class); + request.setAppCollectorsList( + Arrays.asList(AppCollectorsMap.newInstance(id, collectorAddr))); + return request; + } + + public abstract List<AppCollectorsMap> getAppCollectorsList(); + + public abstract void setAppCollectorsList( + List<AppCollectorsMap> appCollectorsList); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java new file mode 100644 index 0000000..4157c47 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.util.Records; + +public abstract class ReportNewCollectorInfoResponse { + + @Private + public static ReportNewCollectorInfoResponse newInstance() { + ReportNewCollectorInfoResponse response = + Records.newRecord(ReportNewCollectorInfoResponse.class); + return response; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java new file mode 100644 index 0000000..7014388 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; + +public class GetTimelineCollectorContextRequestPBImpl extends + GetTimelineCollectorContextRequest { + + private GetTimelineCollectorContextRequestProto + proto = GetTimelineCollectorContextRequestProto.getDefaultInstance(); + private GetTimelineCollectorContextRequestProto.Builder builder = null; + private boolean viaProto = false; + + private ApplicationId appId = null; + + public GetTimelineCollectorContextRequestPBImpl() { + builder = GetTimelineCollectorContextRequestProto.newBuilder(); + } + + public GetTimelineCollectorContextRequestPBImpl( + GetTimelineCollectorContextRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetTimelineCollectorContextRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (appId != null) { + builder.setAppId(convertToProtoFormat(this.appId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetTimelineCollectorContextRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ApplicationId getApplicationId() { + if (this.appId != null) { + return this.appId; + } + + GetTimelineCollectorContextRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasAppId()) { + return null; + } + + this.appId = convertFromProtoFormat(p.getAppId()); + return this.appId; + } + + @Override + public void setApplicationId(ApplicationId id) { + maybeInitBuilder(); + if (id == null) { + builder.clearAppId(); + } + this.appId = id; + } + + private ApplicationIdPBImpl convertFromProtoFormat( + YarnProtos.ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private YarnProtos.ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl)t).getProto(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java new file mode 100644 index 0000000..151b036 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; + +public class GetTimelineCollectorContextResponsePBImpl extends + GetTimelineCollectorContextResponse { + + private GetTimelineCollectorContextResponseProto proto = + GetTimelineCollectorContextResponseProto.getDefaultInstance(); + private GetTimelineCollectorContextResponseProto.Builder builder = null; + private boolean viaProto = false; + + public GetTimelineCollectorContextResponsePBImpl() { + builder = GetTimelineCollectorContextResponseProto.newBuilder(); + } + + public GetTimelineCollectorContextResponsePBImpl( + GetTimelineCollectorContextResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetTimelineCollectorContextResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetTimelineCollectorContextResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getUserId() { + GetTimelineCollectorContextResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasUserId()) { + return null; + } + return p.getUserId(); + } + + @Override + public void setUserId(String userId) { + maybeInitBuilder(); + if (userId == null) { + builder.clearUserId(); + return; + } + builder.setUserId(userId); + } + + @Override + public String getFlowName() { + GetTimelineCollectorContextResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasFlowName()) { + return null; + } + return p.getFlowName(); + } + + @Override + public void setFlowName(String flowName) { + maybeInitBuilder(); + if (flowName == null) { + builder.clearFlowName(); + return; + } + builder.setFlowName(flowName); + } + + @Override + public String getFlowVersion() { + GetTimelineCollectorContextResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasFlowVersion()) { + return null; + } + return p.getFlowVersion(); + } + + @Override + public void setFlowVersion(String flowVersion) { + maybeInitBuilder(); + if (flowVersion == null) { + builder.clearFlowVersion(); + return; + } + builder.setFlowVersion(flowVersion); + } + + @Override + public long getFlowRunId() { + GetTimelineCollectorContextResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getFlowRunId(); + } + + @Override + public void setFlowRunId(long flowRunId) { + maybeInitBuilder(); + builder.setFlowRunId(flowRunId); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org