http://git-wip-us.apache.org/repos/asf/hadoop/blob/15712234/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 3234d6f..f826631 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -239,6 +240,11 @@ public abstract class MockAsm extends MockApps { public boolean isAppInCompletedStates() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public CollectorInfo getCollectorInfo() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/15712234/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 9365e54..17cafef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -321,13 +322,13 @@ public class MockRMApp implements RMApp { return false; } - public String getCollectorAddr() { + @Override + public AppCollectorData getCollectorData() { throw new UnsupportedOperationException("Not supported yet."); } @Override - public AppCollectorData getCollectorData() { + public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet."); } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/15712234/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 07058f6..eb4381d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -80,7 +80,7 @@ public class TestTimelineServiceClientIntegration { auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0], collectorManager, conf); - auxService.addApplication(ApplicationId.newInstance(0, 1)); + auxService.addApplication(ApplicationId.newInstance(0, 1), "user"); } catch (ExitUtil.ExitException e) { fail(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/15712234/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java index da76958..84d892d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java @@ -23,7 +23,10 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.BufferedReader; @@ -40,6 +43,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.io.Text; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -51,10 +55,12 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; @@ -76,7 +82,6 @@ public class TestTimelineAuthFilterForV2 { private static final String FOO_USER = "foo"; private static final String HTTP_USER = "HTTP"; - private static final File TEST_ROOT_DIR = new File( System.getProperty("test.build.dir", "target" + File.separator + "test-dir"), UUID.randomUUID().toString()); @@ -88,21 +93,35 @@ public class TestTimelineAuthFilterForV2 { private static String httpSpnegoPrincipal = KerberosTestUtils. getServerPrincipal(); + // First param indicates whether HTTPS access or HTTP access and second param + // indicates whether it is kerberos access or token based access. @Parameterized.Parameters - public static Collection<Object[]> withSsl() { - return Arrays.asList(new Object[][] {{false}, {true}}); + public static Collection<Object[]> params() { + return Arrays.asList(new Object[][] {{false, true}, {false, false}, + {true, false}, {true, true}}); } private static MiniKdc testMiniKDC; private static String keystoresDir; private static String sslConfDir; private static Configuration conf; + private static UserGroupInformation nonKerberosUser; + static { + try { + nonKerberosUser = UserGroupInformation.getCurrentUser(); + } catch (IOException e) {} + } + // Indicates whether HTTPS or HTTP access. private boolean withSsl; + // Indicates whether Kerberos based login is used or token based access is + // done. + private boolean withKerberosLogin; private NodeTimelineCollectorManager collectorManager; private PerNodeTimelineCollectorsAuxService auxService; - - public TestTimelineAuthFilterForV2(boolean withSsl) { + public TestTimelineAuthFilterForV2(boolean withSsl, + boolean withKerberosLogin) { this.withSsl = withSsl; + this.withKerberosLogin = withKerberosLogin; } @BeforeClass @@ -143,8 +162,6 @@ public class TestTimelineAuthFilterForV2 { conf.set("hadoop.proxyuser.HTTP.hosts", "*"); conf.set("hadoop.proxyuser.HTTP.users", FOO_USER); UserGroupInformation.setConfiguration(conf); - SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB, - YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost"); } catch (Exception e) { fail("Couldn't setup TimelineServer V2."); } @@ -166,9 +183,27 @@ public class TestTimelineAuthFilterForV2 { conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_ONLY.name()); } + UserGroupInformation.setConfiguration(conf); collectorManager = new DummyNodeTimelineCollectorManager(); - auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0], - collectorManager, conf); + auxService = PerNodeTimelineCollectorsAuxService.launchServer( + new String[0], collectorManager, conf); + if (withKerberosLogin) { + SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB, + YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost"); + } + ApplicationId appId = ApplicationId.newInstance(0, 1); + auxService.addApplication( + appId, UserGroupInformation.getCurrentUser().getUserName()); + if (!withKerberosLogin) { + AppLevelTimelineCollector collector = + (AppLevelTimelineCollector)collectorManager.get(appId); + org.apache.hadoop.security.token.Token + <TimelineDelegationTokenIdentifier> token = + collector.getDelegationTokenForApp(); + token.setService(new Text("localhost" + token.getService().toString(). + substring(token.getService().toString().indexOf(":")))); + UserGroupInformation.getCurrentUser().addToken(token); + } } private TimelineV2Client createTimelineClientForUGI(ApplicationId appId) { @@ -199,9 +234,14 @@ public class TestTimelineAuthFilterForV2 { } if (withSsl) { KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir); - File base = new File(BASEDIR); - FileUtil.fullyDelete(base); + FileUtil.fullyDelete(new File(BASEDIR)); + } + if (withKerberosLogin) { + UserGroupInformation.getCurrentUser().logoutUserFromKeytab(); } + // Reset the user for next run. + UserGroupInformation.setLoginUser( + UserGroupInformation.createRemoteUser(nonKerberosUser.getUserName())); } private static TimelineEntity createEntity(String id, String type) { @@ -241,35 +281,44 @@ public class TestTimelineAuthFilterForV2 { } } + private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir, + String entityType) throws Exception { + TimelineV2Client client = createTimelineClientForUGI(appId); + try { + // Sync call. Results available immediately. + client.putEntities(createEntity("entity1", entityType)); + assertEquals(1, entityTypeDir.listFiles().length); + verifyEntity(entityTypeDir, "entity1", entityType); + // Async call. + client.putEntitiesAsync(createEntity("entity2", entityType)); + } finally { + client.stop(); + } + } + @Test public void testPutTimelineEntities() throws Exception { - final ApplicationId appId = ApplicationId.newInstance(0, 1); - auxService.addApplication(appId); final String entityType = "dummy_type"; + final ApplicationId appId = ApplicationId.newInstance(0, 1); final File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() + File.separator + "entities" + File.separator + - YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator + "test_user" + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator + + UserGroupInformation.getCurrentUser().getUserName() + File.separator + "test_flow_name" + File.separator + "test_flow_version" + File.separator + "1" + File.separator + appId.toString() + File.separator + entityType); try { - KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() { - @Override - public Void call() throws Exception { - TimelineV2Client client = createTimelineClientForUGI(appId); - try { - // Sync call. Results available immediately. - client.putEntities(createEntity("entity1", entityType)); - assertEquals(1, entityTypeDir.listFiles().length); - verifyEntity(entityTypeDir, "entity1", entityType); - // Async call. - client.putEntitiesAsync(createEntity("entity2", entityType)); + if (withKerberosLogin) { + KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() { + @Override + public Void call() throws Exception { + publishAndVerifyEntity(appId, entityTypeDir, entityType); return null; - } finally { - client.stop(); } - } - }); + }); + } else { + publishAndVerifyEntity(appId, entityTypeDir, entityType); + } // Wait for async entity to be published. for (int i = 0; i < 50; i++) { if (entityTypeDir.listFiles().length == 2) { @@ -279,6 +328,11 @@ public class TestTimelineAuthFilterForV2 { } assertEquals(2, entityTypeDir.listFiles().length); verifyEntity(entityTypeDir, "entity2", entityType); + AppLevelTimelineCollector collector = + (AppLevelTimelineCollector)collectorManager.get(appId); + auxService.removeApplication(appId); + verify(collectorManager.getTokenManagerService()).cancelToken( + eq(collector.getDelegationTokenForApp()), any(String.class)); } finally { FileUtils.deleteQuietly(entityTypeDir); } @@ -291,12 +345,19 @@ public class TestTimelineAuthFilterForV2 { } @Override + protected TimelineV2DelegationTokenSecretManagerService + createTokenManagerService() { + return spy(new TimelineV2DelegationTokenSecretManagerService()); + } + + @Override protected CollectorNodemanagerProtocol getNMCollectorService() { CollectorNodemanagerProtocol protocol = mock(CollectorNodemanagerProtocol.class); try { GetTimelineCollectorContextResponse response = - GetTimelineCollectorContextResponse.newInstance("test_user", + GetTimelineCollectorContextResponse.newInstance( + UserGroupInformation.getCurrentUser().getUserName(), "test_flow_name", "test_flow_version", 1L); when(protocol.getTimelineCollectorContext(any( GetTimelineCollectorContextRequest.class))).thenReturn(response); http://git-wip-us.apache.org/repos/asf/hadoop/blob/15712234/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index c481dbe..13426b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -24,9 +24,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -41,13 +44,20 @@ public class AppLevelTimelineCollector extends TimelineCollector { private static final Log LOG = LogFactory.getLog(TimelineCollector.class); private final ApplicationId appId; + private final String appUser; private final TimelineCollectorContext context; private UserGroupInformation currentUser; + private Token<TimelineDelegationTokenIdentifier> delegationTokenForApp; public AppLevelTimelineCollector(ApplicationId appId) { + this(appId, null); + } + + public AppLevelTimelineCollector(ApplicationId appId, String user) { super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); Preconditions.checkNotNull(appId, "AppId shouldn't be null"); this.appId = appId; + this.appUser = user; context = new TimelineCollectorContext(); } @@ -55,6 +65,20 @@ public class AppLevelTimelineCollector extends TimelineCollector { return currentUser; } + public String getAppUser() { + return appUser; + } + + void setDelegationTokenForApp( + Token<TimelineDelegationTokenIdentifier> token) { + this.delegationTokenForApp = token; + } + + @VisibleForTesting + public Token<TimelineDelegationTokenIdentifier> getDelegationTokenForApp() { + return this.delegationTokenForApp; + } + @Override protected void serviceInit(Configuration conf) throws Exception { context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID, http://git-wip-us.apache.org/repos/asf/hadoop/blob/15712234/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java index ac91275..6c0d693 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java @@ -56,8 +56,8 @@ public class AppLevelTimelineCollectorWithAgg private ScheduledThreadPoolExecutor appAggregationExecutor; private AppLevelAggregator appAggregator; - public AppLevelTimelineCollectorWithAgg(ApplicationId appId) { - super(appId); + public AppLevelTimelineCollectorWithAgg(ApplicationId appId, String user) { + super(appId, user); } private static Set<String> initializeSkipSet() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/15712234/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 50ebb0f..cad993d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -30,14 +30,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; @@ -71,6 +74,8 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { private final boolean runningAsAuxService; + private UserGroupInformation loginUGI; + static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; @VisibleForTesting @@ -85,25 +90,40 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { @Override protected void serviceInit(Configuration conf) throws Exception { - tokenMgrService = new TimelineV2DelegationTokenSecretManagerService(); + tokenMgrService = createTokenManagerService(); addService(tokenMgrService); + this.loginUGI = UserGroupInformation.getCurrentUser(); super.serviceInit(conf); } @Override protected void serviceStart() throws Exception { - if (UserGroupInformation.isSecurityEnabled() && !runningAsAuxService) { + if (UserGroupInformation.isSecurityEnabled()) { // Do security login for cases where collector is running outside NM. - try { - doSecureLogin(); - } catch(IOException ie) { - throw new YarnRuntimeException("Failed to login", ie); + if (!runningAsAuxService) { + try { + doSecureLogin(); + } catch(IOException ie) { + throw new YarnRuntimeException("Failed to login", ie); + } } + this.loginUGI = UserGroupInformation.getLoginUser(); } super.serviceStart(); startWebApp(); } + protected TimelineV2DelegationTokenSecretManagerService + createTokenManagerService() { + return new TimelineV2DelegationTokenSecretManagerService(); + } + + @VisibleForTesting + public TimelineV2DelegationTokenSecretManagerService + getTokenManagerService() { + return tokenMgrService; + } + private void doSecureLogin() throws IOException { Configuration conf = getConfig(); InetSocketAddress addr = NetUtils.createSocketAddr(conf.getTrimmed( @@ -122,13 +142,45 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { super.serviceStop(); } + @VisibleForTesting + public Token<TimelineDelegationTokenIdentifier> generateTokenForAppCollector( + String user) { + Token<TimelineDelegationTokenIdentifier> token = tokenMgrService. + generateToken(UserGroupInformation.createRemoteUser(user), + loginUGI.getShortUserName()); + token.setService(new Text(timelineRestServerBindAddress)); + return token; + } + + @VisibleForTesting + public void cancelTokenForAppCollector( + AppLevelTimelineCollector appCollector) throws IOException { + if (appCollector.getDelegationTokenForApp() != null) { + tokenMgrService.cancelToken(appCollector.getDelegationTokenForApp(), + appCollector.getAppUser()); + } + } + @Override protected void doPostPut(ApplicationId appId, TimelineCollector collector) { try { // Get context info from NM updateTimelineCollectorContext(appId, collector); + // Generate token for app collector. + org.apache.hadoop.yarn.api.records.Token token = null; + if (UserGroupInformation.isSecurityEnabled() && + collector instanceof AppLevelTimelineCollector) { + AppLevelTimelineCollector appCollector = + (AppLevelTimelineCollector)collector; + Token<TimelineDelegationTokenIdentifier> timelineToken = + generateTokenForAppCollector(appCollector.getAppUser()); + appCollector.setDelegationTokenForApp(timelineToken); + token = org.apache.hadoop.yarn.api.records.Token.newInstance( + timelineToken.getIdentifier(), timelineToken.getKind().toString(), + timelineToken.getPassword(), timelineToken.getService().toString()); + } // Report to NM if a new collector is added. - reportNewCollectorToNM(appId); + reportNewCollectorToNM(appId, token); } catch (YarnException | IOException e) { // throw exception here as it cannot be used if failed communicate with NM LOG.error("Failed to communicate with NM Collector Service for " + appId); @@ -136,6 +188,18 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { } } + @Override + protected void postRemove(ApplicationId appId, TimelineCollector collector) { + if (collector instanceof AppLevelTimelineCollector) { + try { + cancelTokenForAppCollector((AppLevelTimelineCollector)collector); + } catch (IOException e) { + LOG.warn("Failed to cancel token for app collector with appId " + + appId, e); + } + } + } + /** * Launch the REST web server for this collector manager. */ @@ -180,11 +244,12 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { timelineRestServerBindAddress); } - private void reportNewCollectorToNM(ApplicationId appId) + private void reportNewCollectorToNM(ApplicationId appId, + org.apache.hadoop.yarn.api.records.Token token) throws YarnException, IOException { ReportNewCollectorInfoRequest request = ReportNewCollectorInfoRequest.newInstance(appId, - this.timelineRestServerBindAddress); + this.timelineRestServerBindAddress, token); LOG.info("Report a new collector for application: " + appId + " to the NM Collector Service."); getNMCollectorService().reportNewCollectorInfo(request); http://git-wip-us.apache.org/repos/asf/hadoop/blob/15712234/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 669e752..cb48e72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -114,11 +114,12 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { * exists, no new service is created. * * @param appId Application Id to be added. + * @param user Application Master container user. * @return whether it was added successfully */ - public boolean addApplication(ApplicationId appId) { + public boolean addApplication(ApplicationId appId, String user) { AppLevelTimelineCollector collector = - new AppLevelTimelineCollectorWithAgg(appId); + new AppLevelTimelineCollectorWithAgg(appId, user); return (collectorManager.putIfAbsent(appId, collector) == collector); } @@ -147,7 +148,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { ApplicationId appId = context.getContainerId(). getApplicationAttemptId().getApplicationId(); - addApplication(appId); + addApplication(appId, context.getUser()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/15712234/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java index eef8436..de7db58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/security/TimelineV2DelegationTokenSecretManagerService.java @@ -18,8 +18,13 @@ package org.apache.hadoop.yarn.server.timelineservice.security; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSecretManagerService; @@ -43,6 +48,17 @@ public class TimelineV2DelegationTokenSecretManagerService extends tokenMaxLifetime, tokenRenewInterval, tokenRemovalScanInterval); } + public Token<TimelineDelegationTokenIdentifier> generateToken( + UserGroupInformation ugi, String renewer) { + return ((TimelineV2DelegationTokenSecretManager) + getTimelineDelegationTokenSecretManager()).generateToken(ugi, renewer); + } + + public void cancelToken(Token<TimelineDelegationTokenIdentifier> token, + String canceller) throws IOException { + getTimelineDelegationTokenSecretManager().cancelToken(token, canceller); + } + /** * Delegation token secret manager for ATSv2. */ @@ -70,6 +86,21 @@ public class TimelineV2DelegationTokenSecretManagerService extends delegationTokenRenewInterval, delegationTokenRemoverScanInterval); } + public Token<TimelineDelegationTokenIdentifier> generateToken( + UserGroupInformation ugi, String renewer) { + Text realUser = null; + if (ugi.getRealUser() != null) { + realUser = new Text(ugi.getRealUser().getUserName()); + } + TimelineDelegationTokenIdentifier identifier = createIdentifier(); + identifier.setOwner(new Text(ugi.getUserName())); + identifier.setRenewer(new Text(renewer)); + identifier.setRealUser(realUser); + byte[] password = createPassword(identifier); + return new Token<TimelineDelegationTokenIdentifier>(identifier.getBytes(), + password, identifier.getKind(), null); + } + @Override public TimelineDelegationTokenIdentifier createIdentifier() { return new TimelineDelegationTokenIdentifier(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/15712234/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java index a59f8c1..af9acce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -95,7 +95,7 @@ public class TestNMTimelineCollectorManager { Callable<Boolean> task = new Callable<Boolean>() { public Boolean call() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollectorWithAgg(appId); + new AppLevelTimelineCollectorWithAgg(appId, "user"); return (collectorManager.putIfAbsent(appId, collector) == collector); } }; @@ -126,7 +126,7 @@ public class TestNMTimelineCollectorManager { Callable<Boolean> task = new Callable<Boolean>() { public Boolean call() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollectorWithAgg(appId); + new AppLevelTimelineCollectorWithAgg(appId, "user"); boolean successPut = (collectorManager.putIfAbsent(appId, collector) == collector); return successPut && collectorManager.remove(appId); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org