This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 74ad46d KAFKA-7945: Calc refresh time correctly when token created in the past (#6288) 74ad46d is described below commit 74ad46d880884f9b1435ef10d813a4c6ca843240 Author: Ron Dagostino <rndg...@gmail.com> AuthorDate: Wed Feb 20 16:26:41 2019 -0500 KAFKA-7945: Calc refresh time correctly when token created in the past (#6288) Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> --- .../ExpiringCredentialRefreshingLogin.java | 11 ++- .../ExpiringCredentialRefreshingLoginTest.java | 96 +++++++++++++++++++++- 2 files changed, 103 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java index dcdf3cf..ab2f303 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java @@ -81,6 +81,13 @@ public abstract class ExpiringCredentialRefreshingLogin implements AutoCloseable loginContextFactory.refresherThreadDone(); return; } + // safety check motivated by KAFKA-7945, + // should generally never happen except due to a bug + if (nextRefreshMs.longValue() < nowMs) { + log.warn("[Principal={}]: Expiring credential re-login sleep time was calculated to be in the past! Will explicitly adjust. ({})", principalLogText(), + new Date(nextRefreshMs)); + nextRefreshMs = Long.valueOf(nowMs + 10 * 1000); // refresh in 10 seconds + } log.info("[Principal={}]: Expiring credential re-login sleeping until: {}", principalLogText(), new Date(nextRefreshMs)); time.sleep(nextRefreshMs - nowMs); @@ -307,7 +314,7 @@ public abstract class ExpiringCredentialRefreshingLogin implements AutoCloseable return null; } Long optionalStartTime = expiringCredential.startTimeMs(); - long startMs = optionalStartTime != null ? optionalStartTime.longValue() : currentMs(); + long startMs = optionalStartTime != null ? optionalStartTime.longValue() : relativeToMs; log.info("[Principal={}]: Expiring credential valid from {} to {}", expiringCredential.principalName(), new java.util.Date(startMs), new java.util.Date(expireTimeMs)); @@ -320,7 +327,7 @@ public abstract class ExpiringCredentialRefreshingLogin implements AutoCloseable long refreshMinPeriodSeconds = expiringCredentialRefreshConfig.loginRefreshMinPeriodSeconds(); long clientRefreshBufferSeconds = expiringCredentialRefreshConfig.loginRefreshBufferSeconds(); if (relativeToMs + 1000L * (refreshMinPeriodSeconds + clientRefreshBufferSeconds) > expireTimeMs) { - long retvalRefreshMs = startMs + (long) ((expireTimeMs - startMs) * pct); + long retvalRefreshMs = relativeToMs + (long) ((expireTimeMs - relativeToMs) * pct); log.warn( "[Principal={}]: Expiring credential expires at {}, so buffer times of {} and {} seconds" + " at the front and back, respectively, cannot be accommodated. We will refresh at {}.", diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java index d0008f8..3a39242 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java @@ -84,6 +84,14 @@ public class ExpiringCredentialRefreshingLoginTest { this.clientReloginAllowedBeforeLogout = clientReloginAllowedBeforeLogout; } + public long getCreateMs() { + return time.milliseconds(); + } + + public long getExpireTimeMs() { + return time.milliseconds() + lifetimeMillis; + } + /* * Invoke at login time */ @@ -130,8 +138,8 @@ public class ExpiringCredentialRefreshingLoginTest { private ExpiringCredential internalNewExpiringCredential() { return new ExpiringCredential() { - private final long createMs = time.milliseconds(); - private final long expireTimeMs = createMs + lifetimeMillis; + private final long createMs = getCreateMs(); + private final long expireTimeMs = getExpireTimeMs(); @Override public String principalName() { @@ -443,6 +451,90 @@ public class ExpiringCredentialRefreshingLoginTest { } @Test + public void testRefreshWithExpirationSmallerThanConfiguredBuffersAndOlderCreateTime() throws Exception { + int numExpectedRefreshes = 1; + boolean clientReloginAllowedBeforeLogout = true; + final LoginContext mockLoginContext = mock(LoginContext.class); + Subject subject = new Subject(); + when(mockLoginContext.getSubject()).thenReturn(subject); + + MockTime mockTime = new MockTime(); + long startMs = mockTime.milliseconds(); + /* + * Identify the lifetime of each expiring credential + */ + long lifetimeMinutes = 10L; + /* + * Identify the point at which refresh will occur in that lifetime + */ + long refreshEveryMinutes = 8L; + /* + * Set an absolute last refresh time that will cause the login thread to exit + * after a certain number of re-logins (by adding an extra half of a refresh + * interval). + */ + long absoluteLastRefreshMs = startMs + (1 + numExpectedRefreshes) * 1000 * 60 * refreshEveryMinutes + - 1000 * 60 * refreshEveryMinutes / 2; + /* + * Identify buffer time on either side for the refresh algorithm that will cause + * the entire lifetime to be taken up. In other words, make sure there is no way + * to honor the buffers. + */ + short minPeriodSeconds = (short) (1 + lifetimeMinutes * 60 / 2); + short bufferSeconds = minPeriodSeconds; + + /* + * Define some listeners so we can keep track of who gets done and when. All + * added listeners should end up done except the last, extra one, which should + * not. + */ + MockScheduler mockScheduler = new MockScheduler(mockTime); + List<KafkaFutureImpl<Long>> waiters = addWaiters(mockScheduler, 1000 * 60 * refreshEveryMinutes, + numExpectedRefreshes + 1); + + // Create the ExpiringCredentialRefreshingLogin instance under test + TestLoginContextFactory testLoginContextFactory = new TestLoginContextFactory(); + TestExpiringCredentialRefreshingLogin testExpiringCredentialRefreshingLogin = new TestExpiringCredentialRefreshingLogin( + refreshConfigThatPerformsReloginEveryGivenPercentageOfLifetime( + 1.0 * refreshEveryMinutes / lifetimeMinutes, minPeriodSeconds, bufferSeconds, + clientReloginAllowedBeforeLogout), + testLoginContextFactory, mockTime, 1000 * 60 * lifetimeMinutes, absoluteLastRefreshMs, + clientReloginAllowedBeforeLogout) { + + @Override + public long getCreateMs() { + return super.getCreateMs() - 1000 * 60 * 60; // distant past + } + }; + testLoginContextFactory.configure(mockLoginContext, testExpiringCredentialRefreshingLogin); + + /* + * Perform the login, wait up to a certain amount of time for the refresher + * thread to exit, and make sure the correct calls happened at the correct times + */ + long expectedFinalMs = startMs + numExpectedRefreshes * 1000 * 60 * refreshEveryMinutes; + assertFalse(testLoginContextFactory.refresherThreadStartedFuture().isDone()); + assertFalse(testLoginContextFactory.refresherThreadDoneFuture().isDone()); + testExpiringCredentialRefreshingLogin.login(); + assertTrue(testLoginContextFactory.refresherThreadStartedFuture().isDone()); + testLoginContextFactory.refresherThreadDoneFuture().get(1L, TimeUnit.SECONDS); + assertEquals(expectedFinalMs, mockTime.milliseconds()); + for (int i = 0; i < numExpectedRefreshes; ++i) { + KafkaFutureImpl<Long> waiter = waiters.get(i); + assertTrue(waiter.isDone()); + assertEquals((i + 1) * 1000 * 60 * refreshEveryMinutes, waiter.get().longValue() - startMs); + } + assertFalse(waiters.get(numExpectedRefreshes).isDone()); + + InOrder inOrder = inOrder(mockLoginContext); + inOrder.verify(mockLoginContext).login(); + for (int i = 0; i < numExpectedRefreshes; ++i) { + inOrder.verify(mockLoginContext).login(); + inOrder.verify(mockLoginContext).logout(); + } + } + + @Test public void testRefreshWithMinPeriodIntrusion() throws Exception { int numExpectedRefreshes = 1; boolean clientReloginAllowedBeforeLogout = true;