YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/06105e32 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/06105e32 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/06105e32 Branch: refs/heads/YARN-5355_branch2 Commit: 06105e326ca10632d24936f99822f7224f84fbdd Parents: b27a9df Author: Rohith Sharma K S <rohithsharm...@apache.org> Authored: Fri Aug 11 12:35:35 2017 +0530 Committer: Varun Saxena <varunsax...@apache.org> Committed: Sat Oct 14 15:12:18 2017 +0530 ---------------------------------------------------------------------- .../v2/app/rm/RMContainerAllocator.java | 9 +- .../app/local/TestLocalContainerAllocator.java | 2 +- .../api/protocolrecords/AllocateResponse.java | 92 +++++++++--- .../hadoop/yarn/api/records/CollectorInfo.java | 55 +++++++ .../src/main/proto/yarn_protos.proto | 5 + .../src/main/proto/yarn_service_protos.proto | 2 +- .../api/async/impl/AMRMClientAsyncImpl.java | 6 +- .../ApplicationMasterServiceProtoTestBase.java | 72 +++++++++ .../hadoop/yarn/client/ProtocolHATestBase.java | 22 ++- ...ationMasterServiceProtocolForTimelineV2.java | 71 +++++++++ ...estApplicationMasterServiceProtocolOnHA.java | 46 +----- .../api/async/impl/TestAMRMClientAsync.java | 4 +- .../impl/pb/AllocateResponsePBImpl.java | 37 ++++- .../records/impl/pb/CollectorInfoPBImpl.java | 148 +++++++++++++++++++ .../hadoop/yarn/api/TestPBImplRecords.java | 2 + .../ReportNewCollectorInfoRequest.java | 5 +- .../impl/pb/NodeHeartbeatRequestPBImpl.java | 25 +++- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 21 ++- .../pb/ReportNewCollectorInfoRequestPBImpl.java | 4 +- .../server/api/records/AppCollectorData.java | 27 +++- .../records/impl/pb/AppCollectorDataPBImpl.java | 29 +++- .../yarn_server_common_service_protos.proto | 2 + .../java/org/apache/hadoop/yarn/TestRPC.java | 30 +++- .../hadoop/yarn/TestYarnServerApiClasses.java | 4 +- .../yarn/server/MockResourceManagerFacade.java | 9 +- .../nodemanager/NodeStatusUpdaterImpl.java | 1 - .../application/ApplicationImpl.java | 2 +- .../ApplicationMasterService.java | 3 +- .../resourcemanager/DefaultAMSProcessor.java | 7 +- .../server/resourcemanager/rmapp/RMApp.java | 15 +- .../server/resourcemanager/rmapp/RMAppImpl.java | 10 +- .../applicationsmanager/MockAsm.java | 6 + .../server/resourcemanager/rmapp/MockRMApp.java | 7 +- .../TestTimelineServiceClientIntegration.java | 2 +- .../security/TestTimelineAuthFilterForV2.java | 121 +++++++++++---- .../collector/AppLevelTimelineCollector.java | 24 +++ .../AppLevelTimelineCollectorWithAgg.java | 4 +- .../collector/NodeTimelineCollectorManager.java | 83 +++++++++-- .../PerNodeTimelineCollectorsAuxService.java | 7 +- ...neV2DelegationTokenSecretManagerService.java | 31 ++++ .../TestNMTimelineCollectorManager.java | 4 +- 41 files changed, 892 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 6b52a0e..67f8b29 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -860,13 +860,16 @@ public class RMContainerAllocator extends RMContainerRequestor handleUpdatedNodes(response); handleJobPriorityChange(response); // handle receiving the timeline collector address for this app - String collectorAddr = response.getCollectorAddr(); + String collectorAddr = null; + if (response.getCollectorInfo() != null) { + collectorAddr = response.getCollectorInfo().getCollectorAddr(); + } + MRAppMaster.RunningAppContext appContext = (MRAppMaster.RunningAppContext)this.getContext(); if (collectorAddr != null && !collectorAddr.isEmpty() && appContext.getTimelineV2Client() != null) { - appContext.getTimelineV2Client().setTimelineServiceAddress( - response.getCollectorAddr()); + appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr); } for (ContainerStatus cont : finishedContainers) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java index 3fa0043..9549b68 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java @@ -297,7 +297,7 @@ public class TestLocalContainerAllocator { Resources.none(), null, 1, null, Collections.<NMToken>emptyList(), yarnToken, - Collections.<UpdatedContainer>emptyList()); + Collections.<UpdatedContainer>emptyList(), null); response.setApplicationPriority(Priority.newInstance(0)); return response; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index c369c3c..b2b40a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; @@ -96,7 +97,8 @@ public abstract class AllocateResponse { /** * Use {@link AllocateResponse#newInstance(int, List, List, List, Resource, - * AMCommand, int, PreemptionMessage, List, Token, List)} instead + * AMCommand, int, PreemptionMessage, List, Token, List, CollectorInfo)} + * instead. * @param responseId responseId * @param completedContainers completedContainers * @param allocatedContainers allocatedContainers @@ -117,10 +119,14 @@ public abstract class AllocateResponse { Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List<NMToken> nmTokens, List<ContainerResourceIncrease> increasedContainers, - List<ContainerResourceDecrease> decreasedContainers) { - return newInstance(responseId, completedContainers, allocatedContainers, - updatedNodes, availResources, command, numClusterNodes, preempt, - nmTokens); + List<ContainerResourceDecrease> decreasedContainers, + CollectorInfo collectorInfo) { + return AllocateResponse.newBuilder().responseId(responseId) + .completedContainersStatuses(completedContainers) + .allocatedContainers(allocatedContainers) + .updatedNodes(updatedNodes).availableResources(availResources) + .amCommand(command).nmTokens(nmTokens).collectorInfo(collectorInfo) + .build(); } @Public @@ -147,14 +153,15 @@ public abstract class AllocateResponse { List<Container> allocatedContainers, List<NodeReport> updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken, - List<UpdatedContainer> updatedContainers) { + List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo) { return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) .responseId(responseId) .completedContainersStatuses(completedContainers) .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) .availableResources(availResources).amCommand(command) .preemptionMessage(preempt).nmTokens(nmTokens) - .updatedContainers(updatedContainers).amRmToken(amRMToken).build(); + .updatedContainers(updatedContainers).amRmToken(amRMToken) + .collectorInfo(collectorInfo).build(); } /** @@ -346,6 +353,20 @@ public abstract class AllocateResponse { public abstract void setApplicationPriority(Priority priority); /** + * The data associated with the collector that belongs to this app. Contains + * address and token alongwith identification information. + * + * @return The data of collector that belong to this attempt + */ + @Public + @Unstable + public abstract CollectorInfo getCollectorInfo(); + + @Private + @Unstable + public abstract void setCollectorInfo(CollectorInfo info); + + /** * Get the list of container update errors to inform the * Application Master about the container updates that could not be * satisfied due to error. @@ -544,6 +565,50 @@ public abstract class AllocateResponse { } /** + * Set the <code>applicationPriority</code> of the response. + * @see AllocateResponse#setApplicationPriority(Priority) + * @param applicationPriority + * <code>applicationPriority</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder applicationPriority( + Priority applicationPriority) { + allocateResponse.setApplicationPriority(applicationPriority); + return this; + } + + /** + * Set the <code>collectorInfo</code> of the response. + * @see AllocateResponse#setCollectorInfo(CollectorInfo) + * @param collectorInfo <code>collectorInfo</code> of the response which + * contains collector address, RM id, version and collector token. + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder collectorInfo( + CollectorInfo collectorInfo) { + allocateResponse.setCollectorInfo(collectorInfo); + return this; + } + + /** + * Set the <code>updateErrors</code> of the response. + * @see AllocateResponse#setUpdateErrors(List) + * @param updateErrors <code>updateErrors</code> of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder updateErrors( + List<UpdateContainerError> updateErrors) { + allocateResponse.setUpdateErrors(updateErrors); + return this; + } + + /** * Return generated {@link AllocateResponse} object. * @return {@link AllocateResponse} */ @@ -568,17 +633,4 @@ public abstract class AllocateResponse { @Deprecated public abstract List<ContainerResourceDecrease> getDecreasedContainers(); - /** - * The address of collector that belong to this app - * - * @return The address of collector that belong to this attempt - */ - @Public - @Unstable - public abstract String getCollectorAddr(); - - @Private - @Unstable - public abstract void setCollectorAddr(String collectorAddr); - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java new file mode 100644 index 0000000..960c992 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.util.Records; + +/** + * Collector info containing collector address and collector token passed from + * RM to AM in Allocate Response. + */ +@Private +@InterfaceStability.Unstable +public abstract class CollectorInfo { + + protected static final long DEFAULT_TIMESTAMP_VALUE = -1; + + public static CollectorInfo newInstance(String collectorAddr, Token token) { + CollectorInfo amCollectorInfo = + Records.newRecord(CollectorInfo.class); + amCollectorInfo.setCollectorAddr(collectorAddr); + amCollectorInfo.setCollectorToken(token); + return amCollectorInfo; + } + + public abstract String getCollectorAddr(); + + public abstract void setCollectorAddr(String addr); + + /** + * Get delegation token for app collector which AM will use to publish + * entities. + * @return the delegation token for app collector. + */ + public abstract Token getCollectorToken(); + + public abstract void setCollectorToken(Token token); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index a3ad1e3..07b8335 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -667,3 +667,8 @@ message ContainerResourceDecreaseProto { optional ContainerIdProto container_id = 1; optional ResourceProto capability = 2; } + +message CollectorInfoProto { + optional string collector_addr = 1; + optional hadoop.common.TokenProto collector_token = 2; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index bc2330d..8301971 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -115,7 +115,7 @@ message AllocateResponseProto { repeated ContainerResourceDecreaseProto decreased_containers = 11; optional hadoop.common.TokenProto am_rm_token = 12; optional PriorityProto application_priority = 13; - optional string collector_addr = 14; + optional CollectorInfoProto collector_info = 14; repeated UpdateContainerErrorProto update_errors = 15; repeated UpdatedContainerProto updated_containers = 16; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 1691c80..29d64db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -324,7 +324,11 @@ extends AMRMClientAsync<T> { AllocateResponse response = (AllocateResponse) object; - String collectorAddress = response.getCollectorAddr(); + String collectorAddress = null; + if (response.getCollectorInfo() != null) { + collectorAddress = response.getCollectorInfo().getCollectorAddr(); + } + TimelineV2Client timelineClient = client.getRegisteredTimelineV2Client(); if (timelineClient != null && collectorAddress != null http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java new file mode 100644 index 0000000..4521018 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java @@ -0,0 +1,72 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; + +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.junit.After; + +/** + * Test Base for Application Master Service Protocol. + */ +public abstract class ApplicationMasterServiceProtoTestBase + extends ProtocolHATestBase { + + private ApplicationMasterProtocol amClient; + private ApplicationAttemptId attemptId; + + protected void startupHAAndSetupClient() throws Exception { + attemptId = this.cluster.createFakeApplicationAttemptId(); + + Token<AMRMTokenIdentifier> appToken = + this.cluster.getResourceManager().getRMContext() + .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); + appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf)); + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + UserGroupInformation.getCurrentUser().addToken(appToken); + syncToken(appToken); + amClient = ClientRMProxy + .createRMProxy(this.conf, ApplicationMasterProtocol.class); + } + + @After + public void shutDown() { + if(this.amClient != null) { + RPC.stopProxy(this.amClient); + } + } + + protected ApplicationMasterProtocol getAMClient() { + return amClient; + } + + private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException { + for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { + this.cluster.getResourceManager(i).getRMContext() + .getAMRMTokenSecretManager().addPersistedPassword(token); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index a8e9132..efb1987 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -98,6 +99,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -804,11 +806,21 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes { } public AllocateResponse createFakeAllocateResponse() { - return AllocateResponse.newInstance(-1, - new ArrayList<ContainerStatus>(), - new ArrayList<Container>(), new ArrayList<NodeReport>(), - Resource.newInstance(1024, 2), null, 1, - null, new ArrayList<NMToken>()); + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + return AllocateResponse.newInstance(-1, + new ArrayList<ContainerStatus>(), new ArrayList<Container>(), + new ArrayList<NodeReport>(), Resource.newInstance(1024, 2), null, 1, + null, new ArrayList<NMToken>(), null, + new ArrayList<UpdatedContainer>(), + CollectorInfo.newInstance("host:port", Token.newInstance( + new byte[] {0}, "TIMELINE", new byte[] {0}, "rm"))); + } else { + return AllocateResponse.newInstance(-1, + new ArrayList<ContainerStatus>(), + new ArrayList<Container>(), new ArrayList<NodeReport>(), + Resource.newInstance(1024, 2), null, 1, + null, new ArrayList<NMToken>()); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java new file mode 100644 index 0000000..be8c302 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java @@ -0,0 +1,71 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests Application Master Protocol with timeline service v2 enabled. + */ +public class TestApplicationMasterServiceProtocolForTimelineV2 + extends ApplicationMasterServiceProtoTestBase { + + @Before + public void initialize() throws Exception { + HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE + 200, conf); + HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE + 200, conf); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + startHACluster(0, false, false, true); + super.startupHAAndSetupClient(); + } + + @Test(timeout = 15000) + public void testAllocateForTimelineV2OnHA() + throws YarnException, IOException { + AllocateRequest request = AllocateRequest.newInstance(0, 50f, + new ArrayList<ResourceRequest>(), + new ArrayList<ContainerId>(), + ResourceBlacklistRequest.newInstance(new ArrayList<String>(), + new ArrayList<String>())); + AllocateResponse response = getAMClient().allocate(request); + Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); + Assert.assertNotNull(response.getCollectorInfo()); + Assert.assertEquals("host:port", + response.getCollectorInfo().getCollectorAddr()); + Assert.assertNotNull(response.getCollectorInfo().getCollectorToken()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java index ad86fb3..c2f39a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java @@ -23,10 +23,6 @@ import java.util.ArrayList; import org.junit.Assert; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -34,45 +30,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.junit.After; import org.junit.Before; import org.junit.Test; public class TestApplicationMasterServiceProtocolOnHA - extends ProtocolHATestBase { - private ApplicationMasterProtocol amClient; - private ApplicationAttemptId attemptId ; - + extends ApplicationMasterServiceProtoTestBase { @Before public void initialize() throws Exception { startHACluster(0, false, false, true); - attemptId = this.cluster.createFakeApplicationAttemptId(); - - Token<AMRMTokenIdentifier> appToken = - this.cluster.getResourceManager().getRMContext() - .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); - appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf)); - UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); - UserGroupInformation.getCurrentUser().addToken(appToken); - syncToken(appToken); - - amClient = ClientRMProxy - .createRMProxy(this.conf, ApplicationMasterProtocol.class); - } - - @After - public void shutDown() { - if(this.amClient != null) { - RPC.stopProxy(this.amClient); - } + super.startupHAAndSetupClient(); } @Test(timeout = 15000) @@ -81,7 +52,7 @@ public class TestApplicationMasterServiceProtocolOnHA RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest.newInstance("localhost", 0, ""); RegisterApplicationMasterResponse response = - amClient.registerApplicationMaster(request); + getAMClient().registerApplicationMaster(request); Assert.assertEquals(response, this.cluster.createFakeRegisterApplicationMasterResponse()); } @@ -93,7 +64,7 @@ public class TestApplicationMasterServiceProtocolOnHA FinishApplicationMasterRequest.newInstance( FinalApplicationStatus.SUCCEEDED, "", ""); FinishApplicationMasterResponse response = - amClient.finishApplicationMaster(request); + getAMClient().finishApplicationMaster(request); Assert.assertEquals(response, this.cluster.createFakeFinishApplicationMasterResponse()); } @@ -105,14 +76,7 @@ public class TestApplicationMasterServiceProtocolOnHA new ArrayList<ContainerId>(), ResourceBlacklistRequest.newInstance(new ArrayList<String>(), new ArrayList<String>())); - AllocateResponse response = amClient.allocate(request); + AllocateResponse response = getAMClient().allocate(request); Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); } - - private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException { - for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { - this.cluster.getResourceManager(i).getRMContext() - .getAMRMTokenSecretManager().addPersistedPassword(token); - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index ba38340..9c64412 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -426,8 +426,8 @@ public class TestAMRMClientAsync { } AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, - new ArrayList<NodeReport>(), null, null, 1, null, nmTokens, - updatedContainers); + new ArrayList<NodeReport>(), null, null, 1, null, nmTokens, null, + updatedContainers, null); return response; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 05faef8..06cc45b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; @@ -82,6 +85,7 @@ public class AllocateResponsePBImpl extends AllocateResponse { private PreemptionMessage preempt; private Token amrmToken = null; private Priority appPriority = null; + private CollectorInfo collectorInfo = null; public AllocateResponsePBImpl() { builder = AllocateResponseProto.newBuilder(); @@ -164,6 +168,9 @@ public class AllocateResponsePBImpl extends AllocateResponse { if (this.amrmToken != null) { builder.setAmRmToken(convertToProtoFormat(this.amrmToken)); } + if (this.collectorInfo != null) { + builder.setCollectorInfo(convertToProtoFormat(this.collectorInfo)); + } if (this.appPriority != null) { builder.setApplicationPriority(convertToProtoFormat(this.appPriority)); } @@ -410,19 +417,25 @@ public class AllocateResponsePBImpl extends AllocateResponse { @Override - public synchronized String getCollectorAddr() { + public synchronized CollectorInfo getCollectorInfo() { AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - return p.getCollectorAddr(); + if (this.collectorInfo != null) { + return this.collectorInfo; + } + if (!p.hasCollectorInfo()) { + return null; + } + this.collectorInfo = convertFromProtoFormat(p.getCollectorInfo()); + return this.collectorInfo; } @Override - public synchronized void setCollectorAddr(String collectorAddr) { + public synchronized void setCollectorInfo(CollectorInfo info) { maybeInitBuilder(); - if (collectorAddr == null) { - builder.clearCollectorAddr(); - return; + if (info == null) { + builder.clearCollectorInfo(); } - builder.setCollectorAddr(collectorAddr); + this.collectorInfo = info; } @Override @@ -730,6 +743,16 @@ public class AllocateResponsePBImpl extends AllocateResponse { return ((NodeReportPBImpl)t).getProto(); } + private synchronized CollectorInfoPBImpl convertFromProtoFormat( + CollectorInfoProto p) { + return new CollectorInfoPBImpl(p); + } + + private synchronized CollectorInfoProto convertToProtoFormat( + CollectorInfo t) { + return ((CollectorInfoPBImpl)t).getProto(); + } + private synchronized ContainerPBImpl convertFromProtoFormat( ContainerProto p) { return new ContainerPBImpl(p); http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java new file mode 100644 index 0000000..bb54133 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java @@ -0,0 +1,148 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.api.records.CollectorInfo; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto; +import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +/** + * Protocol record implementation of {@link CollectorInfo}. + */ +public class CollectorInfoPBImpl extends CollectorInfo { + + private CollectorInfoProto proto = CollectorInfoProto.getDefaultInstance(); + + private CollectorInfoProto.Builder builder = null; + private boolean viaProto = false; + + private String collectorAddr = null; + private Token collectorToken = null; + + + public CollectorInfoPBImpl() { + builder = CollectorInfoProto.newBuilder(); + } + + public CollectorInfoPBImpl(CollectorInfoProto proto) { + this.proto = proto; + viaProto = true; + } + + public CollectorInfoProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = CollectorInfoProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public String getCollectorAddr() { + CollectorInfoProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorAddr == null && p.hasCollectorAddr()) { + this.collectorAddr = p.getCollectorAddr(); + } + return this.collectorAddr; + } + + @Override + public void setCollectorAddr(String addr) { + maybeInitBuilder(); + if (collectorAddr == null) { + builder.clearCollectorAddr(); + } + this.collectorAddr = addr; + } + + @Override + public Token getCollectorToken() { + CollectorInfoProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorToken == null && p.hasCollectorToken()) { + this.collectorToken = convertFromProtoFormat(p.getCollectorToken()); + } + return this.collectorToken; + } + + @Override + public void setCollectorToken(Token token) { + maybeInitBuilder(); + if (token == null) { + builder.clearCollectorToken(); + } + this.collectorToken = token; + } + + private TokenPBImpl convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + + private void mergeLocalToBuilder() { + if (this.collectorAddr != null) { + builder.setCollectorAddr(this.collectorAddr); + } + if (this.collectorToken != null) { + builder.setCollectorToken(convertToProtoFormat(this.collectorToken)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index d76fc6f..1d3bf03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -102,6 +102,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestP import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -409,6 +410,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest { generateByNewInstance(ApplicationTimeout.class); generateByNewInstance(ContainerResourceIncreaseRequest.class); generateByNewInstance(QueueConfigurations.class); + generateByNewInstance(CollectorInfo.class); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java index 1503eca..a4c1a38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java @@ -22,6 +22,7 @@ import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.util.Records; @@ -37,11 +38,11 @@ public abstract class ReportNewCollectorInfoRequest { } public static ReportNewCollectorInfoRequest newInstance( - ApplicationId id, String collectorAddr) { + ApplicationId id, String collectorAddr, Token token) { ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); request.setAppCollectorsList( - Arrays.asList(AppCollectorData.newInstance(id, collectorAddr))); + Arrays.asList(AppCollectorData.newInstance(id, collectorAddr, token))); return request; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 73a8abe..c07a6eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -26,23 +26,26 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -164,9 +167,13 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { builder.clearRegisteringCollectors(); for (Map.Entry<ApplicationId, AppCollectorData> entry : registeringCollectors.entrySet()) { + AppCollectorData data = entry.getValue(); builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(entry.getValue().getCollectorAddr())); + .setAppCollectorAddr(data.getCollectorAddr()) + .setAppCollectorToken(convertToProtoFormat(data.getCollectorToken())) + .setRmIdentifier(data.getRMIdentifier()) + .setVersion(data.getVersion())); } } @@ -267,8 +274,10 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { this.registeringCollectors = new HashMap<>(); for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); + Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken()); AppCollectorData data = AppCollectorData.newInstance(appId, - c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(), + collectorToken); this.registeringCollectors.put(appId, data); } } @@ -309,6 +318,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { return ((MasterKeyPBImpl)t).getProto(); } + private TokenPBImpl convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + @Override public Set<NodeLabel> getNodeLabels() { initNodeLabels(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 0c93c30..eb03e2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -26,32 +26,35 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -158,6 +161,8 @@ public class NodeHeartbeatResponsePBImpl extends builder.addAppCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) .setAppCollectorAddr(data.getCollectorAddr()) + .setAppCollectorToken( + convertToProtoFormat(entry.getValue().getCollectorToken())) .setRmIdentifier(data.getRMIdentifier()) .setVersion(data.getVersion())); } @@ -663,8 +668,10 @@ public class NodeHeartbeatResponsePBImpl extends this.appCollectorsMap = new HashMap<>(); for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); + Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken()); AppCollectorData data = AppCollectorData.newInstance(appId, - c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(), + collectorToken); this.appCollectorsMap.put(appId, data); } } @@ -844,5 +851,13 @@ public class NodeHeartbeatResponsePBImpl extends SignalContainerRequest t) { return ((SignalContainerRequestPBImpl)t).getProto(); } + + private TokenProto convertToProtoFormat(Token t) { + return ((TokenPBImpl) t).getProto(); + } + + private TokenPBImpl convertFromProtoFormat(TokenProto p) { + return new TokenPBImpl(p); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java index 3f3dcf5..5ffc3a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java @@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; -import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl; public class ReportNewCollectorInfoRequestPBImpl extends ReportNewCollectorInfoRequest { http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java index da2e5de..5266dca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.api.records; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.util.Records; @@ -31,20 +32,32 @@ public abstract class AppCollectorData { protected static final long DEFAULT_TIMESTAMP_VALUE = -1; public static AppCollectorData newInstance( - ApplicationId id, String collectorAddr, long rmIdentifier, long version) { + ApplicationId id, String collectorAddr, long rmIdentifier, long version, + Token token) { AppCollectorData appCollectorData = Records.newRecord(AppCollectorData.class); appCollectorData.setApplicationId(id); appCollectorData.setCollectorAddr(collectorAddr); appCollectorData.setRMIdentifier(rmIdentifier); appCollectorData.setVersion(version); + appCollectorData.setCollectorToken(token); return appCollectorData; } + public static AppCollectorData newInstance( + ApplicationId id, String collectorAddr, long rmIdentifier, long version) { + return newInstance(id, collectorAddr, rmIdentifier, version, null); + } + public static AppCollectorData newInstance(ApplicationId id, - String collectorAddr) { + String collectorAddr, Token token) { return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE, - DEFAULT_TIMESTAMP_VALUE); + DEFAULT_TIMESTAMP_VALUE, token); + } + + public static AppCollectorData newInstance(ApplicationId id, + String collectorAddr) { + return newInstance(id, collectorAddr, null); } /** @@ -101,4 +114,12 @@ public abstract class AppCollectorData { public abstract void setVersion(long version); + /** + * Get delegation token for app collector which AM will use to publish + * entities. + * @return the delegation token for app collector. + */ + public abstract Token getCollectorToken(); + + public abstract void setCollectorToken(Token token); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java index 7d3a805..7144f51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java @@ -19,10 +19,11 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; - +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder; @@ -43,6 +44,7 @@ public class AppCollectorDataPBImpl extends AppCollectorData { private String collectorAddr = null; private Long rmIdentifier = null; private Long version = null; + private Token collectorToken = null; public AppCollectorDataPBImpl() { builder = AppCollectorDataProto.newBuilder(); @@ -158,6 +160,24 @@ public class AppCollectorDataPBImpl extends AppCollectorData { builder.setVersion(version); } + @Override + public Token getCollectorToken() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorToken == null && p.hasAppCollectorToken()) { + this.collectorToken = new TokenPBImpl(p.getAppCollectorToken()); + } + return this.collectorToken; + } + + @Override + public void setCollectorToken(Token token) { + maybeInitBuilder(); + if (token == null) { + builder.clearAppCollectorToken(); + } + this.collectorToken = token; + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } @@ -195,6 +215,9 @@ public class AppCollectorDataPBImpl extends AppCollectorData { if (this.version != null) { builder.setVersion(this.version); } + if (this.collectorToken != null) { + builder.setAppCollectorToken( + ((TokenPBImpl)this.collectorToken).getProto()); + } } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 58874a3..3072867 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -22,6 +22,7 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; package hadoop.yarn; +import "Security.proto"; import "yarn_protos.proto"; import "yarn_server_common_protos.proto"; import "yarn_service_protos.proto"; @@ -140,6 +141,7 @@ message AppCollectorDataProto { optional string app_collector_addr = 2; optional int64 rm_identifier = 3 [default = -1]; optional int64 version = 4 [default = -1]; + optional hadoop.common.TokenProto app_collector_token = 5; } ////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 8291197..82dfaea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; @@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; @@ -93,6 +95,21 @@ public class TestRPC { "collectors' number in ReportNewCollectorInfoRequest is not ONE."; public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0"; + private static final Token DEFAULT_COLLECTOR_TOKEN; + static { + TimelineDelegationTokenIdentifier identifier = + new TimelineDelegationTokenIdentifier(); + identifier.setOwner(new Text("user")); + identifier.setRenewer(new Text("user")); + identifier.setRealUser(new Text("user")); + long now = Time.now(); + identifier.setIssueDate(now); + identifier.setMaxDate(now + 1000L); + identifier.setMasterKeyId(500); + identifier.setSequenceNumber(5); + DEFAULT_COLLECTOR_TOKEN = Token.newInstance(identifier.getBytes(), + identifier.getKind().toString(), identifier.getBytes(), "localhost:0"); + } public static final ApplicationId DEFAULT_APP_ID = ApplicationId.newInstance(0, 0); @@ -173,7 +190,16 @@ public class TestRPC { try { ReportNewCollectorInfoRequest request = ReportNewCollectorInfoRequest.newInstance( - DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR); + DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, null); + proxy.reportNewCollectorInfo(request); + } catch (YarnException e) { + Assert.fail("RPC call failured is not expected here."); + } + + try { + ReportNewCollectorInfoRequest request = + ReportNewCollectorInfoRequest.newInstance( + DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, DEFAULT_COLLECTOR_TOKEN); proxy.reportNewCollectorInfo(request); } catch (YarnException e) { Assert.fail("RPC call failured is not expected here."); @@ -437,6 +463,8 @@ public class TestRPC { DEFAULT_APP_ID); Assert.assertEquals(appCollector.getCollectorAddr(), DEFAULT_COLLECTOR_ADDR); + Assert.assertTrue(appCollector.getCollectorToken() == null || + appCollector.getCollectorToken().equals(DEFAULT_COLLECTOR_TOKEN)); } else { throw new YarnException(ILLEGAL_NUMBER_MESSAGE); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 15071f3..a1890dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; @@ -351,7 +352,8 @@ public class TestYarnServerApiClasses { private Map<ApplicationId, AppCollectorData> getCollectors() { ApplicationId appID = ApplicationId.newInstance(1L, 1); String collectorAddr = "localhost:0"; - AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); + AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr, + Token.newInstance(new byte[0], "kind", new byte[0], "s")); Map<ApplicationId, AppCollectorData> collectorMap = new HashMap<>(); collectorMap.put(appID, data); http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index e33d7e1..c49d6e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -418,10 +418,11 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - return AllocateResponse.newInstance(0, new ArrayList<ContainerStatus>(), - containerList, new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, - 1, null, new ArrayList<NMToken>(), newAMRMToken, - new ArrayList<UpdatedContainer>()); + return AllocateResponse.newInstance(0, + new ArrayList<ContainerStatus>(), containerList, + new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null, + new ArrayList<NMToken>(), newAMRMToken, + new ArrayList<UpdatedContainer>(), null); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 2942c13..65bde63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; - import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 3ff2ada..39be7a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -44,8 +44,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; -import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 122e4a7..988ad9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -45,8 +45,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - - +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 618433b..c8cff80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerUpdateType; @@ -299,9 +300,9 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { // add collector address for this application if (YarnConfiguration.timelineServiceV2Enabled( getRmContext().getYarnConfiguration())) { - AppCollectorData data = app.getCollectorData(); - if (data != null) { - response.setCollectorAddr(data.getCollectorAddr()); + CollectorInfo collectorInfo = app.getCollectorInfo(); + if (collectorInfo != null) { + response.setCollectorInfo(collectorInfo); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 1a0b920..93c41b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -187,14 +188,24 @@ public interface RMApp extends EventHandler<RMAppEvent> { * only if the timeline service v.2 is enabled. * * @return the data for the application's collector, including collector - * address, collector ID. Return null if the timeline service v.2 is not - * enabled. + * address, RM ID, version and collector token. Return null if the timeline + * service v.2 is not enabled. */ @InterfaceAudience.Private @InterfaceStability.Unstable AppCollectorData getCollectorData(); /** + * The timeline collector information to be sent to AM. It should be used + * only if the timeline service v.2 is enabled. + * + * @return collector info, including collector address and collector token. + * Return null if the timeline service v.2 is not enabled. + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + CollectorInfo getCollectorInfo(); + /** * The original tracking url for the application master. * @return the original tracking url for the application master. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/06105e32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 61aebfe..7526ea3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeout; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -163,6 +164,7 @@ public class RMAppImpl implements RMApp, Recoverable { private int firstAttemptIdInStateStore = 1; private int nextAttemptId = 1; private AppCollectorData collectorData; + private CollectorInfo collectorInfo; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -526,7 +528,7 @@ public class RMAppImpl implements RMApp, Recoverable { */ public void startTimelineCollector() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(applicationId); + new AppLevelTimelineCollector(applicationId, user); rmContext.getRMTimelineCollectorManager().putIfAbsent( applicationId, collector); } @@ -614,6 +616,12 @@ public class RMAppImpl implements RMApp, Recoverable { public void setCollectorData(AppCollectorData incomingData) { this.collectorData = incomingData; + this.collectorInfo = CollectorInfo.newInstance( + incomingData.getCollectorAddr(), incomingData.getCollectorToken()); + } + + public CollectorInfo getCollectorInfo() { + return this.collectorInfo; } public void removeCollectorData() { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org