This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new 29f4181e107 [FLINK-31109][Yarn] Support Hadoop proxy user when delegation token fetch is disabled. This closes #22009 29f4181e107 is described below commit 29f4181e1076712dcaeaadeaad0c1bcb2ef25b70 Author: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com> AuthorDate: Sun Feb 26 12:07:22 2023 -0800 [FLINK-31109][Yarn] Support Hadoop proxy user when delegation token fetch is disabled. This closes #22009 * [FLINK-31109][yarn] Support Hadoop proxy user when delegation token fetch is disabled (cherry picked from commit 78136133fbec4ca145dec66d4bc0c324c8e16d82) --- .../runtime/security/modules/HadoopModule.java | 17 ++++++- .../token/hadoop/HBaseDelegationTokenProvider.java | 2 +- .../hadoop/HadoopFSDelegationTokenProvider.java | 2 +- .../token/hadoop/KerberosLoginProvider.java | 8 ++- .../runtime/security/modules/HadoopModuleTest.java | 28 +++++++++++ .../token/hadoop/KerberosLoginProviderITCase.java | 58 ++++++++++++++++++---- .../apache/flink/yarn/YarnClusterDescriptor.java | 54 +++++++++++++------- 7 files changed, 133 insertions(+), 36 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java index 9f0bba6ea90..dfc2a7ee3fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.security.modules; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.hadoop.HadoopUserUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider; @@ -70,10 +71,22 @@ public class HadoopModule implements SecurityModule { try { KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig); - if (kerberosLoginProvider.isLoginPossible()) { - kerberosLoginProvider.doLogin(); + if (kerberosLoginProvider.isLoginPossible(true)) { + kerberosLoginProvider.doLogin(true); loginUser = UserGroupInformation.getLoginUser(); + if (HadoopUserUtils.isProxyUser((loginUser)) + && securityConfig + .getFlinkConfig() + .getBoolean(SecurityOptions.DELEGATION_TOKENS_ENABLED)) { + throw new UnsupportedOperationException( + "Hadoop Proxy user is supported only when" + + " delegation tokens fetch is managed outside of Flink!" + + " Please try again with " + + SecurityOptions.DELEGATION_TOKENS_ENABLED.key() + + " config set to false!"); + } + if (loginUser.isFromKeytab()) { String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java index cdabe8a9d12..bb7f87f4e71 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java @@ -116,7 +116,7 @@ public class HBaseDelegationTokenProvider implements DelegationTokenProvider { return false; } return hbaseConf.get("hbase.security.authentication").equals("kerberos") - && kerberosLoginProvider.isLoginPossible(); + && kerberosLoginProvider.isLoginPossible(false); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java index 960a80509d2..aeb37fefdcb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java @@ -99,7 +99,7 @@ public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider return false; } return HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser()) - && kerberosLoginProvider.isLoginPossible(); + && kerberosLoginProvider.isLoginPossible(false); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java index 4851a4f470c..5a94fc3d1ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java @@ -59,7 +59,7 @@ public class KerberosLoginProvider { this.useTicketCache = securityConfiguration.useTicketCache(); } - public boolean isLoginPossible() throws IOException { + public boolean isLoginPossible(boolean supportProxyUser) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { LOG.debug("Security is enabled"); } else { @@ -77,6 +77,8 @@ public class KerberosLoginProvider { LOG.debug("Login from ticket cache is possible"); return true; } + } else if (supportProxyUser) { + return true; } else { throwProxyUserNotSupported(); } @@ -89,7 +91,7 @@ public class KerberosLoginProvider { /** * Does kerberos login and sets current user. Must be called when isLoginPossible returns true. */ - public void doLogin() throws IOException { + public void doLogin(boolean supportProxyUser) throws IOException { if (principal != null) { LOG.info( "Attempting to login to KDC using principal: {} keytab: {}", principal, keytab); @@ -99,6 +101,8 @@ public class KerberosLoginProvider { LOG.info("Attempting to load user's ticket cache"); UserGroupInformation.loginUserFromSubject(null); LOG.info("Loaded user's ticket cache successfully"); + } else if (supportProxyUser) { + LOG.info("Proxy user doesn't need login since it must have credentials already"); } else { throwProxyUserNotSupported(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java index 959da264496..ee26e3c6e4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java @@ -19,17 +19,23 @@ package org.apache.flink.runtime.security.modules; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.hadoop.security.UserGroupInformation; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; import java.io.IOException; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** Test for {@link HadoopModule}. */ class HadoopModuleTest { @@ -51,4 +57,26 @@ class HadoopModuleTest { verify(userGroupInformation, times(1)).checkTGTAndReloginFromKeytab(); } + + @Test + public void hadoopProxyUserSetWithDelegationTokensEnabledShouldThrow() { + try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) { + UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); + ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true); + ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); + ugi.when(UserGroupInformation::getLoginUser).thenReturn(userGroupInformation); + when(userGroupInformation.getAuthenticationMethod()) + .thenReturn(UserGroupInformation.AuthenticationMethod.PROXY); + Configuration flinkConf = new Configuration(); + flinkConf.set(SecurityOptions.DELEGATION_TOKENS_ENABLED, true); + SecurityConfiguration securityConf = new SecurityConfiguration(flinkConf); + org.apache.hadoop.conf.Configuration hadoopConf = + new org.apache.hadoop.conf.Configuration(); + HadoopModule hadoopModule = new HadoopModule(securityConf, hadoopConf); + Exception exception = + assertThrows( + SecurityModule.SecurityInstallException.class, hadoopModule::install); + assertTrue(exception.getCause() instanceof UnsupportedOperationException); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java index a7dde7c1c2d..b0cd1c055a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java @@ -32,6 +32,7 @@ import java.nio.file.Path; import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB; import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_PRINCIPAL; import static org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -57,7 +58,7 @@ public class KerberosLoginProviderITCase { UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - assertFalse(kerberosLoginProvider.isLoginPossible()); + assertFalse(kerberosLoginProvider.isLoginPossible(false)); } } @@ -71,7 +72,7 @@ public class KerberosLoginProviderITCase { ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(false); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - assertFalse(kerberosLoginProvider.isLoginPossible()); + assertFalse(kerberosLoginProvider.isLoginPossible(false)); } } @@ -88,7 +89,7 @@ public class KerberosLoginProviderITCase { ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - assertTrue(kerberosLoginProvider.isLoginPossible()); + assertTrue(kerberosLoginProvider.isLoginPossible(false)); } } @@ -104,12 +105,12 @@ public class KerberosLoginProviderITCase { ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - assertTrue(kerberosLoginProvider.isLoginPossible()); + assertTrue(kerberosLoginProvider.isLoginPossible(false)); } } @Test - public void isLoginPossibleMustThrowExceptionWithProxyUser() { + public void isLoginPossibleMustThrowExceptionWithNoProxyUserSupport() { Configuration configuration = new Configuration(); KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration); @@ -121,7 +122,24 @@ public class KerberosLoginProviderITCase { ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); assertThrows( - UnsupportedOperationException.class, kerberosLoginProvider::isLoginPossible); + UnsupportedOperationException.class, + () -> kerberosLoginProvider.isLoginPossible(false)); + } + } + + @Test + public void isLoginPossibleMustReturnTrueWithProxyUserSupport() throws IOException { + Configuration configuration = new Configuration(); + KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration); + + try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) { + UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); + when(userGroupInformation.getAuthenticationMethod()) + .thenReturn(UserGroupInformation.AuthenticationMethod.PROXY); + ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true); + ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); + + assertTrue(kerberosLoginProvider.isLoginPossible(true)); } } @@ -137,7 +155,7 @@ public class KerberosLoginProviderITCase { UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - kerberosLoginProvider.doLogin(); + kerberosLoginProvider.doLogin(false); ugi.verify(() -> UserGroupInformation.loginUserFromKeytab(anyString(), anyString())); } } @@ -153,13 +171,30 @@ public class KerberosLoginProviderITCase { when(userGroupInformation.hasKerberosCredentials()).thenReturn(true); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - kerberosLoginProvider.doLogin(); + kerberosLoginProvider.doLogin(false); ugi.verify(() -> UserGroupInformation.loginUserFromSubject(null)); } } @Test - public void doLoginMustThrowExceptionWithProxyUser() { + public void doLoginMustThrowExceptionWithNoProxyUserSupport() { + Configuration configuration = new Configuration(); + KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration); + + try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) { + UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); + when(userGroupInformation.getAuthenticationMethod()) + .thenReturn(UserGroupInformation.AuthenticationMethod.PROXY); + ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); + + assertThrows( + UnsupportedOperationException.class, + () -> kerberosLoginProvider.doLogin(false)); + } + } + + @Test + public void doLoginMustNotThrowExceptionWithProxyUserSupport() { Configuration configuration = new Configuration(); KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration); @@ -169,7 +204,8 @@ public class KerberosLoginProviderITCase { .thenReturn(UserGroupInformation.AuthenticationMethod.PROXY); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - assertThrows(UnsupportedOperationException.class, kerberosLoginProvider::doLogin); + assertDoesNotThrow( + () -> kerberosLoginProvider.doLogin(true), "Proxy user is not supported"); } } @@ -210,7 +246,7 @@ public class KerberosLoginProviderITCase { } @Test - public void doLoginAndReturnUGIMustThrowExceptionWithProxyUser() { + public void doLoginAndReturnUGIMustThrowExceptionWithNoProxyUserSupport() { Configuration configuration = new Configuration(); KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 3f4d121ca71..1b5cec84654 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -76,6 +76,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -1151,14 +1153,13 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { final ContainerLaunchContext amContainer = setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec); - if (configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) { - KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration); - if (kerberosLoginProvider.isLoginPossible()) { - setTokensFor(amContainer); - } else { - LOG.info( - "Cannot use kerberos delegation token manager, no valid kerberos credentials provided."); - } + boolean fetchToken = configuration.getBoolean(SecurityOptions.DELEGATION_TOKENS_ENABLED); + KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration); + if (kerberosLoginProvider.isLoginPossible(true)) { + setTokensFor(amContainer, fetchToken); + } else { + LOG.info( + "Cannot use kerberos delegation token manager, no valid kerberos credentials provided."); } amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); @@ -1292,21 +1293,36 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { }); } - private void setTokensFor(ContainerLaunchContext containerLaunchContext) throws Exception { - LOG.info("Adding delegation tokens to the AM container."); + private void setTokensFor(ContainerLaunchContext containerLaunchContext, boolean fetchToken) + throws Exception { + Credentials credentials = new Credentials(); + + LOG.info("Loading delegation tokens available locally to add to the AM container"); + // for user + UserGroupInformation currUsr = UserGroupInformation.getCurrentUser(); - DelegationTokenManager delegationTokenManager = - new DefaultDelegationTokenManager(flinkConfiguration, null, null, null); - DelegationTokenContainer container = new DelegationTokenContainer(); - delegationTokenManager.obtainDelegationTokens(container); + Collection<Token<? extends TokenIdentifier>> usrTok = + currUsr.getCredentials().getAllTokens(); + for (Token<? extends TokenIdentifier> token : usrTok) { + LOG.info("Adding user token " + token.getService() + " with " + token); + credentials.addToken(token.getService(), token); + } - // This is here for backward compatibility to make log aggregation work - Credentials credentials = new Credentials(); - for (Map.Entry<String, byte[]> e : container.getTokens().entrySet()) { - if (e.getKey().equals("hadoopfs")) { - credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue())); + if (fetchToken) { + LOG.info("Fetching delegation tokens to add to the AM container."); + DelegationTokenManager delegationTokenManager = + new DefaultDelegationTokenManager(flinkConfiguration, null, null, null); + DelegationTokenContainer container = new DelegationTokenContainer(); + delegationTokenManager.obtainDelegationTokens(container); + + // This is here for backward compatibility to make log aggregation work + for (Map.Entry<String, byte[]> e : container.getTokens().entrySet()) { + if (e.getKey().equals("hadoopfs")) { + credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue())); + } } } + ByteBuffer tokens = ByteBuffer.wrap(HadoopDelegationTokenConverter.serialize(credentials)); containerLaunchContext.setTokens(tokens);