This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 150e42b  KAFKA-7945: Calc refresh time correctly when token created in 
the past (#6288)
150e42b is described below

commit 150e42b5cc76d55ca2f89f6414dbf7912d589182
Author: Ron Dagostino <rndg...@gmail.com>
AuthorDate: Wed Feb 20 16:26:41 2019 -0500

    KAFKA-7945: Calc refresh time correctly when token created in the past 
(#6288)
    
    Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>
---
 .../ExpiringCredentialRefreshingLogin.java         | 11 ++-
 .../ExpiringCredentialRefreshingLoginTest.java     | 96 +++++++++++++++++++++-
 2 files changed, 103 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
index dcdf3cf..ab2f303 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
@@ -81,6 +81,13 @@ public abstract class ExpiringCredentialRefreshingLogin 
implements AutoCloseable
                     loginContextFactory.refresherThreadDone();
                     return;
                 }
+                // safety check motivated by KAFKA-7945,
+                // should generally never happen except due to a bug
+                if (nextRefreshMs.longValue() < nowMs) {
+                    log.warn("[Principal={}]: Expiring credential re-login 
sleep time was calculated to be in the past! Will explicitly adjust. ({})", 
principalLogText(),
+                            new Date(nextRefreshMs));
+                    nextRefreshMs = Long.valueOf(nowMs + 10 * 1000); // 
refresh in 10 seconds
+                }
                 log.info("[Principal={}]: Expiring credential re-login 
sleeping until: {}", principalLogText(),
                         new Date(nextRefreshMs));
                 time.sleep(nextRefreshMs - nowMs);
@@ -307,7 +314,7 @@ public abstract class ExpiringCredentialRefreshingLogin 
implements AutoCloseable
             return null;
         }
         Long optionalStartTime = expiringCredential.startTimeMs();
-        long startMs = optionalStartTime != null ? 
optionalStartTime.longValue() : currentMs();
+        long startMs = optionalStartTime != null ? 
optionalStartTime.longValue() : relativeToMs;
         log.info("[Principal={}]: Expiring credential valid from {} to {}", 
expiringCredential.principalName(),
                 new java.util.Date(startMs), new java.util.Date(expireTimeMs));
 
@@ -320,7 +327,7 @@ public abstract class ExpiringCredentialRefreshingLogin 
implements AutoCloseable
         long refreshMinPeriodSeconds = 
expiringCredentialRefreshConfig.loginRefreshMinPeriodSeconds();
         long clientRefreshBufferSeconds = 
expiringCredentialRefreshConfig.loginRefreshBufferSeconds();
         if (relativeToMs + 1000L * (refreshMinPeriodSeconds + 
clientRefreshBufferSeconds) > expireTimeMs) {
-            long retvalRefreshMs = startMs + (long) ((expireTimeMs - startMs) 
* pct);
+            long retvalRefreshMs = relativeToMs + (long) ((expireTimeMs - 
relativeToMs) * pct);
             log.warn(
                     "[Principal={}]: Expiring credential expires at {}, so 
buffer times of {} and {} seconds"
                             + " at the front and back, respectively, cannot be 
accommodated.  We will refresh at {}.",
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
index d0008f8..3a39242 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
@@ -84,6 +84,14 @@ public class ExpiringCredentialRefreshingLoginTest {
             this.clientReloginAllowedBeforeLogout = 
clientReloginAllowedBeforeLogout;
         }
 
+        public long getCreateMs() {
+            return time.milliseconds();
+        }
+
+        public long getExpireTimeMs() {
+            return time.milliseconds() + lifetimeMillis;
+        }
+
         /*
          * Invoke at login time
          */
@@ -130,8 +138,8 @@ public class ExpiringCredentialRefreshingLoginTest {
 
         private ExpiringCredential internalNewExpiringCredential() {
             return new ExpiringCredential() {
-                private final long createMs = time.milliseconds();
-                private final long expireTimeMs = createMs + lifetimeMillis;
+                private final long createMs = getCreateMs();
+                private final long expireTimeMs = getExpireTimeMs();
 
                 @Override
                 public String principalName() {
@@ -443,6 +451,90 @@ public class ExpiringCredentialRefreshingLoginTest {
     }
 
     @Test
+    public void 
testRefreshWithExpirationSmallerThanConfiguredBuffersAndOlderCreateTime() 
throws Exception {
+        int numExpectedRefreshes = 1;
+        boolean clientReloginAllowedBeforeLogout = true;
+        final LoginContext mockLoginContext = mock(LoginContext.class);
+        Subject subject = new Subject();
+        when(mockLoginContext.getSubject()).thenReturn(subject);
+
+        MockTime mockTime = new MockTime();
+        long startMs = mockTime.milliseconds();
+        /*
+         * Identify the lifetime of each expiring credential
+         */
+        long lifetimeMinutes = 10L;
+        /*
+         * Identify the point at which refresh will occur in that lifetime
+         */
+        long refreshEveryMinutes = 8L;
+        /*
+         * Set an absolute last refresh time that will cause the login thread 
to exit
+         * after a certain number of re-logins (by adding an extra half of a 
refresh
+         * interval).
+         */
+        long absoluteLastRefreshMs = startMs + (1 + numExpectedRefreshes) * 
1000 * 60 * refreshEveryMinutes
+                - 1000 * 60 * refreshEveryMinutes / 2;
+        /*
+         * Identify buffer time on either side for the refresh algorithm that 
will cause
+         * the entire lifetime to be taken up. In other words, make sure there 
is no way
+         * to honor the buffers.
+         */
+        short minPeriodSeconds = (short) (1 + lifetimeMinutes * 60 / 2);
+        short bufferSeconds = minPeriodSeconds;
+
+        /*
+         * Define some listeners so we can keep track of who gets done and 
when. All
+         * added listeners should end up done except the last, extra one, 
which should
+         * not.
+         */
+        MockScheduler mockScheduler = new MockScheduler(mockTime);
+        List<KafkaFutureImpl<Long>> waiters = addWaiters(mockScheduler, 1000 * 
60 * refreshEveryMinutes,
+                numExpectedRefreshes + 1);
+
+        // Create the ExpiringCredentialRefreshingLogin instance under test
+        TestLoginContextFactory testLoginContextFactory = new 
TestLoginContextFactory();
+        TestExpiringCredentialRefreshingLogin 
testExpiringCredentialRefreshingLogin = new 
TestExpiringCredentialRefreshingLogin(
+                refreshConfigThatPerformsReloginEveryGivenPercentageOfLifetime(
+                        1.0 * refreshEveryMinutes / lifetimeMinutes, 
minPeriodSeconds, bufferSeconds,
+                        clientReloginAllowedBeforeLogout),
+                testLoginContextFactory, mockTime, 1000 * 60 * 
lifetimeMinutes, absoluteLastRefreshMs,
+                clientReloginAllowedBeforeLogout) {
+
+            @Override
+            public long getCreateMs() {
+                return super.getCreateMs() - 1000 * 60 * 60; // distant past
+            }
+        };
+        testLoginContextFactory.configure(mockLoginContext, 
testExpiringCredentialRefreshingLogin);
+
+        /*
+         * Perform the login, wait up to a certain amount of time for the 
refresher
+         * thread to exit, and make sure the correct calls happened at the 
correct times
+         */
+        long expectedFinalMs = startMs + numExpectedRefreshes * 1000 * 60 * 
refreshEveryMinutes;
+        
assertFalse(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+        
assertFalse(testLoginContextFactory.refresherThreadDoneFuture().isDone());
+        testExpiringCredentialRefreshingLogin.login();
+        
assertTrue(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+        testLoginContextFactory.refresherThreadDoneFuture().get(1L, 
TimeUnit.SECONDS);
+        assertEquals(expectedFinalMs, mockTime.milliseconds());
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            KafkaFutureImpl<Long> waiter = waiters.get(i);
+            assertTrue(waiter.isDone());
+            assertEquals((i + 1) * 1000 * 60 * refreshEveryMinutes, 
waiter.get().longValue() - startMs);
+        }
+        assertFalse(waiters.get(numExpectedRefreshes).isDone());
+
+        InOrder inOrder = inOrder(mockLoginContext);
+        inOrder.verify(mockLoginContext).login();
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            inOrder.verify(mockLoginContext).login();
+            inOrder.verify(mockLoginContext).logout();
+        }
+    }
+
+    @Test
     public void testRefreshWithMinPeriodIntrusion() throws Exception {
         int numExpectedRefreshes = 1;
         boolean clientReloginAllowedBeforeLogout = true;

Reply via email to