This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new f38f35e  KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER 
refresh login fails (#6233)
f38f35e is described below

commit f38f35ea84328fa5020a5be9471fad930973effc
Author: Ron Dagostino <[email protected]>
AuthorDate: Thu Feb 7 13:06:01 2019 -0500

    KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login 
fails (#6233)
    
    Replaces original loginContext if login fails in the refresh thread to 
ensure that the refresh thread is left in a clean state when there are 
exceptions while connecting to an OAuth server. Also makes client callback 
handler more robust by using the token with the longest remaining time for 
expiry instead of throwing an exception if multiple tokens are found.
    
    Reviewers: Rajini Sivaram <[email protected]>
---
 .../OAuthBearerSaslClientCallbackHandler.java      |  41 ++++++++-
 .../ExpiringCredentialRefreshingLogin.java         |  22 +++--
 .../OAuthBearerSaslClienCallbackHandlerTest.java   | 102 +++++++++++++++++++++
 .../ExpiringCredentialRefreshingLoginTest.java     |  79 +++++++++++++++-
 4 files changed, 230 insertions(+), 14 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
index 352e6b6..bca55be 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
@@ -19,9 +19,13 @@ package 
org.apache.kafka.common.security.oauthbearer.internals;
 import java.io.IOException;
 import java.security.AccessController;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
@@ -34,6 +38,8 @@ import org.apache.kafka.common.security.auth.SaslExtensions;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of {@code AuthenticateCallbackHandler} that recognizes
@@ -49,6 +55,7 @@ import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
  * configuration property.
  */
 public class OAuthBearerSaslClientCallbackHandler implements 
AuthenticateCallbackHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerSaslClientCallbackHandler.class);
     private boolean configured = false;
 
     /**
@@ -93,11 +100,35 @@ public class OAuthBearerSaslClientCallbackHandler 
implements AuthenticateCallbac
         Set<OAuthBearerToken> privateCredentials = subject != null
             ? subject.getPrivateCredentials(OAuthBearerToken.class)
             : Collections.emptySet();
-        if (privateCredentials.size() != 1)
-            throw new IOException(
-                    String.format("Unable to find OAuth Bearer token in 
Subject's private credentials (size=%d)",
-                            privateCredentials.size()));
-        callback.token(privateCredentials.iterator().next());
+        if (privateCredentials.size() == 0)
+            throw new IOException("No OAuth Bearer tokens in Subject's private 
credentials");
+        if (privateCredentials.size() == 1)
+            callback.token(privateCredentials.iterator().next());
+        else {
+            /*
+             * There a very small window of time upon token refresh (on the 
order of milliseconds)
+             * where both an old and a new token appear on the Subject's 
private credentials.
+             * Rather than implement a lock to eliminate this window, we will 
deal with it by
+             * checking for the existence of multiple tokens and choosing the 
one that has the
+             * longest lifetime.  It is also possible that a bug could cause 
multiple tokens to
+             * exist (e.g. KAFKA-7902), so dealing with the unlikely 
possibility that occurs
+             * during normal operation also allows us to deal more robustly 
with potential bugs.
+             */
+            SortedSet<OAuthBearerToken> sortedByLifetime =
+                new TreeSet<>(
+                    new Comparator<OAuthBearerToken>() {
+                        @Override
+                        public int compare(OAuthBearerToken o1, 
OAuthBearerToken o2) {
+                            return Long.compare(o1.lifetimeMs(), 
o2.lifetimeMs());
+                        }
+                    });
+            sortedByLifetime.addAll(privateCredentials);
+            log.warn("Found {} OAuth Bearer tokens in Subject's private 
credentials; the oldest expires at {}, will use the newest, which expires at 
{}",
+                sortedByLifetime.size(),
+                new Date(sortedByLifetime.first().lifetimeMs()),
+                new Date(sortedByLifetime.last().lifetimeMs()));
+            callback.token(sortedByLifetime.last());
+        }
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
index f6bb264..dcdf3cf 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
@@ -375,13 +375,21 @@ public abstract class ExpiringCredentialRefreshingLogin 
implements AutoCloseable
              */
             ExpiringCredential optionalCredentialToLogout = expiringCredential;
             LoginContext optionalLoginContextToLogout = loginContext;
-            loginContext = 
loginContextFactory.createLoginContext(ExpiringCredentialRefreshingLogin.this);
-            log.info("Initiating re-login for {}, logout() still needs to be 
called on a previous login = {}",
-                    principalName, optionalCredentialToLogout != null);
-            loginContext.login();
-            // Perform a logout() on any original credential if necessary
-            if (optionalCredentialToLogout != null)
-                optionalLoginContextToLogout.logout();
+            boolean cleanLogin = false; // remember to restore the original if 
necessary
+            try {
+                loginContext = 
loginContextFactory.createLoginContext(ExpiringCredentialRefreshingLogin.this);
+                log.info("Initiating re-login for {}, logout() still needs to 
be called on a previous login = {}",
+                        principalName, optionalCredentialToLogout != null);
+                loginContext.login();
+                cleanLogin = true; // no need to restore the original
+                // Perform a logout() on any original credential if necessary
+                if (optionalCredentialToLogout != null)
+                    optionalLoginContextToLogout.logout();
+            } finally {
+                if (!cleanLogin)
+                    // restore the original
+                    loginContext = optionalLoginContextToLogout;
+            }
             /*
              * Get the new credential and make sure it is not any old one that 
required a
              * logout() after the login()
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClienCallbackHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClienCallbackHandlerTest.java
new file mode 100644
index 0000000..4115c52
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClienCallbackHandlerTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.Set;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+
+import 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler;
+import org.junit.Test;
+
+public class OAuthBearerSaslClienCallbackHandlerTest {
+    private static OAuthBearerToken createTokenWithLifetimeMillis(final long 
lifetimeMillis) {
+        return new OAuthBearerToken() {
+            @Override
+            public String value() {
+                return null;
+            }
+
+            @Override
+            public Long startTimeMs() {
+                return null;
+            }
+
+            @Override
+            public Set<String> scope() {
+                return null;
+            }
+
+            @Override
+            public String principalName() {
+                return null;
+            }
+
+            @Override
+            public long lifetimeMs() {
+                return lifetimeMillis;
+            }
+        };
+    }
+
+    @Test(expected = IOException.class)
+    public void testWithZeroTokens() throws Throwable {
+        OAuthBearerSaslClientCallbackHandler handler = createCallbackHandler();
+        try {
+            Subject.doAs(new Subject(), (PrivilegedExceptionAction<Void>) () 
-> {
+                OAuthBearerTokenCallback callback = new 
OAuthBearerTokenCallback();
+                handler.handle(new Callback[] {callback});
+                return null;
+            });
+        } catch (PrivilegedActionException e) {
+            throw e.getCause();
+        }
+    }
+
+    @Test()
+    public void testWithPotentiallyMultipleTokens() throws Exception {
+        OAuthBearerSaslClientCallbackHandler handler = createCallbackHandler();
+        Subject.doAs(new Subject(), (PrivilegedExceptionAction<Void>) () -> {
+            final int maxTokens = 4;
+            final Set<Object> privateCredentials = 
Subject.getSubject(AccessController.getContext())
+                    .getPrivateCredentials();
+            privateCredentials.clear();
+            for (int num = 1; num <= maxTokens; ++num) {
+                privateCredentials.add(createTokenWithLifetimeMillis(num));
+                OAuthBearerTokenCallback callback = new 
OAuthBearerTokenCallback();
+                handler.handle(new Callback[] {callback});
+                assertEquals(num, callback.token().lifetimeMs());
+            }
+            return null;
+        });
+    }
+
+    private static OAuthBearerSaslClientCallbackHandler 
createCallbackHandler() {
+        OAuthBearerSaslClientCallbackHandler handler = new 
OAuthBearerSaslClientCallbackHandler();
+        handler.configure(Collections.emptyMap(), 
OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+                Collections.emptyList());
+        return handler;
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
index cc0b983..d0008f8 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 import org.mockito.InOrder;
+import org.mockito.Mockito;
 
 public class ExpiringCredentialRefreshingLoginTest {
     private static final Configuration EMPTY_WILDCARD_CONFIGURATION;
@@ -232,8 +233,28 @@ public class ExpiringCredentialRefreshingLoginTest {
         }
 
         @Override
-        public LoginContext 
createLoginContext(ExpiringCredentialRefreshingLogin 
expiringCredentialRefreshingLogin) {
-            return testLoginContext;
+        public LoginContext 
createLoginContext(ExpiringCredentialRefreshingLogin 
expiringCredentialRefreshingLogin) throws LoginException {
+            return new LoginContext("", null, null, 
EMPTY_WILDCARD_CONFIGURATION) {
+                private boolean loginSuccess = false;
+                @Override
+                public void login() throws LoginException {
+                    testLoginContext.login();
+                    loginSuccess = true;
+                }
+        
+                @Override
+                public void logout() throws LoginException {
+                    if (!loginSuccess)
+                        // will cause the refresher thread to exit
+                        throw new IllegalStateException("logout called without 
a successful login");
+                    testLoginContext.logout();
+                }
+        
+                @Override
+                public Subject getSubject() {
+                    return testLoginContext.getSubject();
+                }
+            };
         }
 
         @Override
@@ -582,6 +603,60 @@ public class ExpiringCredentialRefreshingLoginTest {
         }
     }
 
+    @Test
+    public void testLoginExceptionCausesCorrectLogout() throws Exception {
+        int numExpectedRefreshes = 3;
+        boolean clientReloginAllowedBeforeLogout = true;
+        Subject subject = new Subject();
+        final LoginContext mockLoginContext = mock(LoginContext.class);
+        when(mockLoginContext.getSubject()).thenReturn(subject);
+        Mockito.doNothing().doThrow(new 
LoginException()).doNothing().when(mockLoginContext).login();
+
+        MockTime mockTime = new MockTime();
+        long startMs = mockTime.milliseconds();
+        /*
+         * Identify the lifetime of each expiring credential
+         */
+        long lifetimeMinutes = 100L;
+        /*
+         * Identify the point at which refresh will occur in that lifetime
+         */
+        long refreshEveryMinutes = 80L;
+        /*
+         * Set an absolute last refresh time that will cause the login thread 
to exit
+         * after a certain number of re-logins (by adding an extra half of a 
refresh
+         * interval).
+         */
+        long absoluteLastRefreshMs = startMs + (1 + numExpectedRefreshes) * 
1000 * 60 * refreshEveryMinutes
+                - 1000 * 60 * refreshEveryMinutes / 2;
+        /*
+         * Identify buffer time on either side for the refresh algorithm
+         */
+        short minPeriodSeconds = (short) 0;
+        short bufferSeconds = minPeriodSeconds;
+
+        // Create the ExpiringCredentialRefreshingLogin instance under test
+        TestLoginContextFactory testLoginContextFactory = new 
TestLoginContextFactory();
+        TestExpiringCredentialRefreshingLogin 
testExpiringCredentialRefreshingLogin = new 
TestExpiringCredentialRefreshingLogin(
+                refreshConfigThatPerformsReloginEveryGivenPercentageOfLifetime(
+                        1.0 * refreshEveryMinutes / lifetimeMinutes, 
minPeriodSeconds, bufferSeconds,
+                        clientReloginAllowedBeforeLogout),
+                testLoginContextFactory, mockTime, 1000 * 60 * 
lifetimeMinutes, absoluteLastRefreshMs,
+                clientReloginAllowedBeforeLogout);
+        testLoginContextFactory.configure(mockLoginContext, 
testExpiringCredentialRefreshingLogin);
+
+        /*
+         * Perform the login and wait up to a certain amount of time for the 
refresher
+         * thread to exit.  A timeout indicates the thread died due to logout()
+         * being invoked on an instance where the login() invocation had 
failed.
+         */
+        
assertFalse(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+        
assertFalse(testLoginContextFactory.refresherThreadDoneFuture().isDone());
+        testExpiringCredentialRefreshingLogin.login();
+        
assertTrue(testLoginContextFactory.refresherThreadStartedFuture().isDone());
+        testLoginContextFactory.refresherThreadDoneFuture().get(1L, 
TimeUnit.SECONDS);
+    }
+
     private static List<KafkaFutureImpl<Long>> addWaiters(MockScheduler 
mockScheduler, long refreshEveryMillis,
             int numWaiters) {
         List<KafkaFutureImpl<Long>> retvalWaiters = new 
ArrayList<>(numWaiters);

Reply via email to