dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r847258427


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials 
credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);

Review Comment:
   I'd love to avoid adding a new executor here (in general we try to avoid a 
thread pollution where possible). We should be able to reuse the existing 
executors that are already available to the `ResourceManager`:
   - ComponentMainThreadExecutor (main thread for the ResourceManager endpoint; 
can be used for scheduling; can't have blocking runnables otherwise we'd 
disrupt the endpoint availability)
   - ioExecutor (for blocking io heavy calls)
   
   Basically the start method would become something along these lines:
   
   ```java
           final long tgtRenewalPeriod = 
configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
           // The renewal future needs to be cancelled during close.
           final ScheduledFuture<?> renewalFuture =
                   mainThreadExecutor.schedule(
                           () ->
                                   ioExecutor.execute(
                                           () -> {
                                               // Todo...
                                           }),
                           tgtRenewalPeriod,
                           TimeUnit.MILLISECONDS);
                           ```
   
   WDYT?
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials 
credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
+            return;
+        }
+
+        ThreadFactory threadFactory =

Review Comment:
   nit: Please mark variables as `final` where possible.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -79,4 +93,48 @@ public void testAllProvidersLoaded() {
         assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
         assertFalse(delegationTokenManager.isProviderLoaded("throw"));
     }
+
+    @Test
+    public void isRenewalPossibleMustGiveBackFalseByDefault() throws 
IOException {
+        UserGroupInformation ugi = 
PowerMockito.mock(UserGroupInformation.class);
+        PowerMockito.mockStatic(UserGroupInformation.class);
+        when(UserGroupInformation.getCurrentUser()).thenReturn(ugi);
+
+        ExceptionThrowingDelegationTokenProvider.enabled = false;
+        Configuration configuration = new Configuration();
+        KerberosDelegationTokenManager delegationTokenManager =
+                new KerberosDelegationTokenManager(configuration);
+
+        assertFalse(delegationTokenManager.isRenewalPossible());
+    }
+
+    @Test
+    public void isRenewalPossibleMustGiveBackTrueWhenKeytab() throws 
IOException {

Review Comment:
   We should use the junit5 resources for temporary files. Eg.
   
   ```suggestion
       public void isRenewalPossibleMustGiveBackTrueWhenKeytab(@TempDir Path 
tmpDir) throws IOException {
           final Path file = Files.createFile(tmpDir.resolve("test.keytab"));
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -20,13 +20,27 @@
 
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;

Review Comment:
   We shouldn't be adding any new tests that are using `Mockito` for mocking. 
Here is a more detailed reasoning behind the decision: 
https://docs.google.com/presentation/d/1fZlTjOJscwmzYadPGl23aui6zopl94Mn5smG-rB0qT8/edit#slide=id.g2fa61f7d00_0_99
   
   Since the UserGroupInformation needs mocking of static mtehods in the 
current form, you'll most likely need to hide it behind a new interface, that 
could be replaced for testing.
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials 
credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkState(renewalExecutor == null, "Manager is already started");
+
+        if (!isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
+            return;
+        }
+
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setDaemon(true)
+                        .setNameFormat("Credential Renewal Thread")
+                        .build();
+        renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
+        // By default, a cancelled task is not automatically removed from the 
work queue until its
+        // delay elapses. We have to enable it manually.
+        renewalExecutor.setRemoveOnCancelPolicy(true);
+
+        startTGTRenewal();
+    }
+
+    @VisibleForTesting
+    boolean isRenewalPossible() throws IOException {
+        if (!StringUtils.isBlank(securityConfiguration.getKeytab())
+                && !StringUtils.isBlank(securityConfiguration.getPrincipal())) 
{
+            LOG.debug("Login from keytab is possible");
+            return true;
+        }
+        LOG.debug("Login from keytab is NOT possible");
+
+        if (securityConfiguration.useTicketCache()
+                && 
UserGroupInformation.getCurrentUser().hasKerberosCredentials()) {
+            LOG.debug("Login from ticket cache is possible");
+            return true;
+        }
+        LOG.debug("Login from ticket cache is NOT possible");
+
+        return false;
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be 
automatic, but in Hadoop
+            // 3.x, it is configurable (see 
hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays 
logged in regardless of
+            // that configuration's value. Note that 
checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            Runnable tgtRenewalTask =
+                    () -> {
+                        try {
+                            LOG.debug("Renewing TGT");
+                            currentUser.checkTGTAndReloginFromKeytab();
+                            LOG.debug("TGT renewed successfully");
+                        } catch (Exception e) {
+                            LOG.error("Error while renewing TGT", e);

Review Comment:
   warning?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java:
##########
@@ -20,13 +20,27 @@
 
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;

Review Comment:
   -> junit5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to