This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 78136133fbe [FLINK-31109][Yarn] Support Hadoop proxy user when 
delegation token fetch is disabled. This closes #22009
78136133fbe is described below

commit 78136133fbec4ca145dec66d4bc0c324c8e16d82
Author: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com>
AuthorDate: Sun Feb 26 12:07:22 2023 -0800

    [FLINK-31109][Yarn] Support Hadoop proxy user when delegation token fetch 
is disabled. This closes #22009
    
    * [FLINK-31109][yarn] Support Hadoop proxy user when delegation token fetch 
is disabled
---
 .../runtime/security/modules/HadoopModule.java     | 17 ++++++-
 .../token/hadoop/HBaseDelegationTokenProvider.java |  2 +-
 .../hadoop/HadoopFSDelegationTokenProvider.java    |  2 +-
 .../token/hadoop/KerberosLoginProvider.java        |  8 ++-
 .../runtime/security/modules/HadoopModuleTest.java | 28 +++++++++++
 .../token/hadoop/KerberosLoginProviderITCase.java  | 58 ++++++++++++++++++----
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 54 +++++++++++++-------
 7 files changed, 133 insertions(+), 36 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 9f0bba6ea90..dfc2a7ee3fe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.security.modules;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.hadoop.HadoopUserUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
@@ -70,10 +71,22 @@ public class HadoopModule implements SecurityModule {
 
         try {
             KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(securityConfig);
-            if (kerberosLoginProvider.isLoginPossible()) {
-                kerberosLoginProvider.doLogin();
+            if (kerberosLoginProvider.isLoginPossible(true)) {
+                kerberosLoginProvider.doLogin(true);
                 loginUser = UserGroupInformation.getLoginUser();
 
+                if (HadoopUserUtils.isProxyUser((loginUser))
+                        && securityConfig
+                                .getFlinkConfig()
+                                
.getBoolean(SecurityOptions.DELEGATION_TOKENS_ENABLED)) {
+                    throw new UnsupportedOperationException(
+                            "Hadoop Proxy user is supported only when"
+                                    + " delegation tokens fetch is managed 
outside of Flink!"
+                                    + " Please try again with "
+                                    + 
SecurityOptions.DELEGATION_TOKENS_ENABLED.key()
+                                    + " config set to false!");
+                }
+
                 if (loginUser.isFromKeytab()) {
                     String fileLocation =
                             
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
index cdabe8a9d12..bb7f87f4e71 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
@@ -116,7 +116,7 @@ public class HBaseDelegationTokenProvider implements 
DelegationTokenProvider {
             return false;
         }
         return 
hbaseConf.get("hbase.security.authentication").equals("kerberos")
-                && kerberosLoginProvider.isLoginPossible();
+                && kerberosLoginProvider.isLoginPossible(false);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
index 960a80509d2..aeb37fefdcb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
@@ -99,7 +99,7 @@ public class HadoopFSDelegationTokenProvider implements 
DelegationTokenProvider
             return false;
         }
         return 
HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())
-                && kerberosLoginProvider.isLoginPossible();
+                && kerberosLoginProvider.isLoginPossible(false);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java
index 4851a4f470c..5a94fc3d1ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java
@@ -59,7 +59,7 @@ public class KerberosLoginProvider {
         this.useTicketCache = securityConfiguration.useTicketCache();
     }
 
-    public boolean isLoginPossible() throws IOException {
+    public boolean isLoginPossible(boolean supportProxyUser) throws 
IOException {
         if (UserGroupInformation.isSecurityEnabled()) {
             LOG.debug("Security is enabled");
         } else {
@@ -77,6 +77,8 @@ public class KerberosLoginProvider {
                 LOG.debug("Login from ticket cache is possible");
                 return true;
             }
+        } else if (supportProxyUser) {
+            return true;
         } else {
             throwProxyUserNotSupported();
         }
@@ -89,7 +91,7 @@ public class KerberosLoginProvider {
     /**
      * Does kerberos login and sets current user. Must be called when 
isLoginPossible returns true.
      */
-    public void doLogin() throws IOException {
+    public void doLogin(boolean supportProxyUser) throws IOException {
         if (principal != null) {
             LOG.info(
                     "Attempting to login to KDC using principal: {} keytab: 
{}", principal, keytab);
@@ -99,6 +101,8 @@ public class KerberosLoginProvider {
             LOG.info("Attempting to load user's ticket cache");
             UserGroupInformation.loginUserFromSubject(null);
             LOG.info("Loaded user's ticket cache successfully");
+        } else if (supportProxyUser) {
+            LOG.info("Proxy user doesn't need login since it must have 
credentials already");
         } else {
             throwProxyUserNotSupported();
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java
index 959da264496..ee26e3c6e4d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java
@@ -19,17 +19,23 @@
 package org.apache.flink.runtime.security.modules;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
 
 import java.io.IOException;
 
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /** Test for {@link HadoopModule}. */
 class HadoopModuleTest {
@@ -51,4 +57,26 @@ class HadoopModuleTest {
 
         verify(userGroupInformation, times(1)).checkTGTAndReloginFromKeytab();
     }
+
+    @Test
+    public void hadoopProxyUserSetWithDelegationTokensEnabledShouldThrow() {
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+            
ugi.when(UserGroupInformation::getLoginUser).thenReturn(userGroupInformation);
+            when(userGroupInformation.getAuthenticationMethod())
+                    
.thenReturn(UserGroupInformation.AuthenticationMethod.PROXY);
+            Configuration flinkConf = new Configuration();
+            flinkConf.set(SecurityOptions.DELEGATION_TOKENS_ENABLED, true);
+            SecurityConfiguration securityConf = new 
SecurityConfiguration(flinkConf);
+            org.apache.hadoop.conf.Configuration hadoopConf =
+                    new org.apache.hadoop.conf.Configuration();
+            HadoopModule hadoopModule = new HadoopModule(securityConf, 
hadoopConf);
+            Exception exception =
+                    assertThrows(
+                            SecurityModule.SecurityInstallException.class, 
hadoopModule::install);
+            assertTrue(exception.getCause() instanceof 
UnsupportedOperationException);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java
index a7dde7c1c2d..b0cd1c055a2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java
@@ -32,6 +32,7 @@ import java.nio.file.Path;
 import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
 import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_PRINCIPAL;
 import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -57,7 +58,7 @@ public class KerberosLoginProviderITCase {
             UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            assertFalse(kerberosLoginProvider.isLoginPossible());
+            assertFalse(kerberosLoginProvider.isLoginPossible(false));
         }
     }
 
@@ -71,7 +72,7 @@ public class KerberosLoginProviderITCase {
             
ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(false);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            assertFalse(kerberosLoginProvider.isLoginPossible());
+            assertFalse(kerberosLoginProvider.isLoginPossible(false));
         }
     }
 
@@ -88,7 +89,7 @@ public class KerberosLoginProviderITCase {
             ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            assertTrue(kerberosLoginProvider.isLoginPossible());
+            assertTrue(kerberosLoginProvider.isLoginPossible(false));
         }
     }
 
@@ -104,12 +105,12 @@ public class KerberosLoginProviderITCase {
             ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            assertTrue(kerberosLoginProvider.isLoginPossible());
+            assertTrue(kerberosLoginProvider.isLoginPossible(false));
         }
     }
 
     @Test
-    public void isLoginPossibleMustThrowExceptionWithProxyUser() {
+    public void isLoginPossibleMustThrowExceptionWithNoProxyUserSupport() {
         Configuration configuration = new Configuration();
         KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(configuration);
 
@@ -121,7 +122,24 @@ public class KerberosLoginProviderITCase {
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
             assertThrows(
-                    UnsupportedOperationException.class, 
kerberosLoginProvider::isLoginPossible);
+                    UnsupportedOperationException.class,
+                    () -> kerberosLoginProvider.isLoginPossible(false));
+        }
+    }
+
+    @Test
+    public void isLoginPossibleMustReturnTrueWithProxyUserSupport() throws 
IOException {
+        Configuration configuration = new Configuration();
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(configuration);
+
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            when(userGroupInformation.getAuthenticationMethod())
+                    
.thenReturn(UserGroupInformation.AuthenticationMethod.PROXY);
+            ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertTrue(kerberosLoginProvider.isLoginPossible(true));
         }
     }
 
@@ -137,7 +155,7 @@ public class KerberosLoginProviderITCase {
             UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            kerberosLoginProvider.doLogin();
+            kerberosLoginProvider.doLogin(false);
             ugi.verify(() -> 
UserGroupInformation.loginUserFromKeytab(anyString(), anyString()));
         }
     }
@@ -153,13 +171,30 @@ public class KerberosLoginProviderITCase {
             
when(userGroupInformation.hasKerberosCredentials()).thenReturn(true);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            kerberosLoginProvider.doLogin();
+            kerberosLoginProvider.doLogin(false);
             ugi.verify(() -> UserGroupInformation.loginUserFromSubject(null));
         }
     }
 
     @Test
-    public void doLoginMustThrowExceptionWithProxyUser() {
+    public void doLoginMustThrowExceptionWithNoProxyUserSupport() {
+        Configuration configuration = new Configuration();
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(configuration);
+
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            when(userGroupInformation.getAuthenticationMethod())
+                    
.thenReturn(UserGroupInformation.AuthenticationMethod.PROXY);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertThrows(
+                    UnsupportedOperationException.class,
+                    () -> kerberosLoginProvider.doLogin(false));
+        }
+    }
+
+    @Test
+    public void doLoginMustNotThrowExceptionWithProxyUserSupport() {
         Configuration configuration = new Configuration();
         KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(configuration);
 
@@ -169,7 +204,8 @@ public class KerberosLoginProviderITCase {
                     
.thenReturn(UserGroupInformation.AuthenticationMethod.PROXY);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            assertThrows(UnsupportedOperationException.class, 
kerberosLoginProvider::doLogin);
+            assertDoesNotThrow(
+                    () -> kerberosLoginProvider.doLogin(true), "Proxy user is 
not supported");
         }
     }
 
@@ -210,7 +246,7 @@ public class KerberosLoginProviderITCase {
     }
 
     @Test
-    public void doLoginAndReturnUGIMustThrowExceptionWithProxyUser() {
+    public void doLoginAndReturnUGIMustThrowExceptionWithNoProxyUserSupport() {
         Configuration configuration = new Configuration();
         KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(configuration);
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 3f4d121ca71..1b5cec84654 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -76,6 +76,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -1151,14 +1153,13 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         final ContainerLaunchContext amContainer =
                 setupApplicationMasterContainer(yarnClusterEntrypoint, 
hasKrb5, processSpec);
 
-        if 
(configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
-            KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(configuration);
-            if (kerberosLoginProvider.isLoginPossible()) {
-                setTokensFor(amContainer);
-            } else {
-                LOG.info(
-                        "Cannot use kerberos delegation token manager, no 
valid kerberos credentials provided.");
-            }
+        boolean fetchToken = 
configuration.getBoolean(SecurityOptions.DELEGATION_TOKENS_ENABLED);
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(configuration);
+        if (kerberosLoginProvider.isLoginPossible(true)) {
+            setTokensFor(amContainer, fetchToken);
+        } else {
+            LOG.info(
+                    "Cannot use kerberos delegation token manager, no valid 
kerberos credentials provided.");
         }
 
         
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
@@ -1292,21 +1293,36 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                         });
     }
 
-    private void setTokensFor(ContainerLaunchContext containerLaunchContext) 
throws Exception {
-        LOG.info("Adding delegation tokens to the AM container.");
+    private void setTokensFor(ContainerLaunchContext containerLaunchContext, 
boolean fetchToken)
+            throws Exception {
+        Credentials credentials = new Credentials();
+
+        LOG.info("Loading delegation tokens available locally to add to the AM 
container");
+        // for user
+        UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
 
-        DelegationTokenManager delegationTokenManager =
-                new DefaultDelegationTokenManager(flinkConfiguration, null, 
null, null);
-        DelegationTokenContainer container = new DelegationTokenContainer();
-        delegationTokenManager.obtainDelegationTokens(container);
+        Collection<Token<? extends TokenIdentifier>> usrTok =
+                currUsr.getCredentials().getAllTokens();
+        for (Token<? extends TokenIdentifier> token : usrTok) {
+            LOG.info("Adding user token " + token.getService() + " with " + 
token);
+            credentials.addToken(token.getService(), token);
+        }
 
-        // This is here for backward compatibility to make log aggregation work
-        Credentials credentials = new Credentials();
-        for (Map.Entry<String, byte[]> e : container.getTokens().entrySet()) {
-            if (e.getKey().equals("hadoopfs")) {
-                
credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue()));
+        if (fetchToken) {
+            LOG.info("Fetching delegation tokens to add to the AM container.");
+            DelegationTokenManager delegationTokenManager =
+                    new DefaultDelegationTokenManager(flinkConfiguration, 
null, null, null);
+            DelegationTokenContainer container = new 
DelegationTokenContainer();
+            delegationTokenManager.obtainDelegationTokens(container);
+
+            // This is here for backward compatibility to make log aggregation 
work
+            for (Map.Entry<String, byte[]> e : 
container.getTokens().entrySet()) {
+                if (e.getKey().equals("hadoopfs")) {
+                    
credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue()));
+                }
             }
         }
+
         ByteBuffer tokens = 
ByteBuffer.wrap(HadoopDelegationTokenConverter.serialize(credentials));
         containerLaunchContext.setTokens(tokens);
 

Reply via email to