Author: ddas Date: Tue Aug 3 00:51:57 2010 New Revision: 981714 URL: http://svn.apache.org/viewvc?rev=981714&view=rev Log: HADOOP-6706. Improves the sasl failure handling due to expired tickets, and other server detected failures. Contributed by Jitendra Pandey and Devaraj Das.
Modified: hadoop/common/trunk/CHANGES.txt hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java Modified: hadoop/common/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=981714&r1=981713&r2=981714&view=diff ============================================================================== --- hadoop/common/trunk/CHANGES.txt (original) +++ hadoop/common/trunk/CHANGES.txt Tue Aug 3 00:51:57 2010 @@ -173,6 +173,9 @@ Trunk (unreleased changes) HADOOP-6873. using delegation token over hftp for long running clients (boryas) + HADOOP-6706. Improves the sasl failure handling due to expired tickets, + and other server detected failures. (Jitendra Pandey and ddas via ddas) + Release 0.21.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=981714&r1=981713&r2=981714&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java Tue Aug 3 00:51:57 2010 @@ -36,6 +36,7 @@ import java.io.OutputStream; import java.security.PrivilegedExceptionAction; import java.util.Hashtable; import java.util.Iterator; +import java.util.Random; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -367,53 +368,109 @@ public class Client { if (saslRpcClient != null) { try { saslRpcClient.dispose(); + saslRpcClient = null; } catch (IOException ignored) { } } } + private synchronized boolean shouldAuthenticateOverKrb() throws IOException { + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation realUser = currentUser.getRealUser(); + if (authMethod == AuthMethod.KERBEROS && loginUser != null && + // Make sure user logged in using Kerberos either keytab or TGT + loginUser.hasKerberosCredentials() && + // relogin only in case it is the login user (e.g. JT) + // or superuser (like oozie). + (loginUser.equals(currentUser) || loginUser.equals(realUser))) { + return true; + } + return false; + } + private synchronized boolean setupSaslConnection(final InputStream in2, final OutputStream out2) throws IOException { - try { - saslRpcClient = new SaslRpcClient(authMethod, token, - serverPrincipal); - return saslRpcClient.saslConnect(in2, out2); - } catch (javax.security.sasl.SaslException je) { - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - UserGroupInformation currentUser = - UserGroupInformation.getCurrentUser(); - UserGroupInformation realUser = currentUser.getRealUser(); - if (authMethod == AuthMethod.KERBEROS && - //try setting up the connection again - // relogin only in case it is the login user (e.g. JT) - // or superuser (like oozie). - ((currentUser != null && currentUser.equals(loginUser)) || - (realUser != null && realUser.equals(loginUser)))) { - try { - //try re-login - if (UserGroupInformation.isLoginKeytabBased()) { - loginUser.reloginFromKeytab(); + saslRpcClient = new SaslRpcClient(authMethod, token, serverPrincipal); + return saslRpcClient.saslConnect(in2, out2); + } + + private synchronized void setupConnection() throws IOException { + short ioFailures = 0; + short timeoutFailures = 0; + while (true) { + try { + this.socket = socketFactory.createSocket(); + this.socket.setTcpNoDelay(tcpNoDelay); + // connection time out is 20s + NetUtils.connect(this.socket, remoteId.getAddress(), 20000); + this.socket.setSoTimeout(pingInterval); + return; + } catch (SocketTimeoutException toe) { + /* + * The max number of retries is 45, which amounts to 20s*45 = 15 + * minutes retries. + */ + handleConnectionFailure(timeoutFailures++, 45, toe); + } catch (IOException ie) { + handleConnectionFailure(ioFailures++, maxRetries, ie); + } + } + } + + /** + * If multiple clients with the same principal try to connect to the same + * server at the same time, the server assumes a replay attack is in + * progress. This is a feature of kerberos. In order to work around this, + * what is done is that the client backs off randomly and tries to initiate + * the connection again. The other problem is to do with ticket expiry. To + * handle that, a relogin is attempted. + */ + private synchronized void handleSaslConnectionFailure( + final int currRetries, final int maxRetries, final Exception ex, + final Random rand, final UserGroupInformation ugi) throws IOException, + InterruptedException { + ugi.doAs(new PrivilegedExceptionAction<Object>() { + public Object run() throws IOException, InterruptedException { + final short MAX_BACKOFF = 5000; + closeConnection(); + disposeSasl(); + if (shouldAuthenticateOverKrb()) { + if (currRetries < maxRetries) { + LOG.debug("Exception encountered while connecting to " + + "the server : " + ex); + // try re-login + if (UserGroupInformation.isLoginKeytabBased()) { + UserGroupInformation.getLoginUser().reloginFromKeytab(); + } else { + UserGroupInformation.getLoginUser().reloginFromTicketCache(); + } + // have granularity of milliseconds + //we are sleeping with the Connection lock held but since this + //connection instance is being used for connecting to the server + //in question, it is okay + Thread.sleep((rand.nextInt(MAX_BACKOFF) + 1)); + return null; } else { - loginUser.reloginFromTicketCache(); + String msg = "Couldn't setup connection for " + + UserGroupInformation.getLoginUser().getUserName() + " to " + + serverPrincipal; + LOG.warn(msg); + throw (IOException) new IOException(msg).initCause(ex); } - disposeSasl(); - saslRpcClient = new SaslRpcClient(authMethod, token, - serverPrincipal); - return saslRpcClient.saslConnect(in2, out2); - } catch (javax.security.sasl.SaslException jee) { - LOG.warn("Couldn't setup connection for " + - loginUser.getUserName() + - " to " + serverPrincipal + " even after relogin."); - throw jee; - } catch (IOException ie) { - ie.initCause(je); - throw ie; + } else { + LOG.warn("Exception encountered while connecting to " + + "the server : " + ex); } - } - throw je; - } + if (ex instanceof RemoteException) + throw (RemoteException) ex; + throw new IOException(ex); + } + }); } + + /** Connect to the server and set up the I/O streams. It then sends * a header to the server and starts * the connection thread that waits for responses. @@ -421,81 +478,95 @@ public class Client { private synchronized void setupIOstreams() throws InterruptedException { if (socket != null || shouldCloseConnection.get()) { return; - } - - short ioFailures = 0; - short timeoutFailures = 0; + } try { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+server); } + short numRetries = 0; + final short MAX_RETRIES = 5; + Random rand = null; while (true) { - try { - this.socket = socketFactory.createSocket(); - this.socket.setTcpNoDelay(tcpNoDelay); - // connection time out is 20s - NetUtils.connect(this.socket, remoteId.getAddress(), 20000); - this.socket.setSoTimeout(pingInterval); - break; - } catch (SocketTimeoutException toe) { - /* The max number of retries is 45, - * which amounts to 20s*45 = 15 minutes retries. - */ - handleConnectionFailure(timeoutFailures++, 45, toe); - } catch (IOException ie) { - handleConnectionFailure(ioFailures++, maxRetries, ie); - } - } - InputStream inStream = NetUtils.getInputStream(socket); - OutputStream outStream = NetUtils.getOutputStream(socket); - writeRpcHeader(outStream); - if (useSasl) { - final InputStream in2 = inStream; - final OutputStream out2 = outStream; - UserGroupInformation ticket = remoteId.getTicket(); - if (authMethod == AuthMethod.KERBEROS) { - if (ticket.getRealUser() != null) { - ticket = ticket.getRealUser(); + setupConnection(); + InputStream inStream = NetUtils.getInputStream(socket); + OutputStream outStream = NetUtils.getOutputStream(socket); + writeRpcHeader(outStream); + if (useSasl) { + final InputStream in2 = inStream; + final OutputStream out2 = outStream; + UserGroupInformation ticket = remoteId.getTicket(); + if (authMethod == AuthMethod.KERBEROS) { + if (ticket.getRealUser() != null) { + ticket = ticket.getRealUser(); + } } - } - if (ticket.doAs(new PrivilegedExceptionAction<Boolean>() { - @Override - public Boolean run() throws IOException { - return setupSaslConnection(in2, out2); + boolean continueSasl = false; + try { + continueSasl = ticket + .doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() throws IOException { + return setupSaslConnection(in2, out2); + } + }); + } catch (Exception ex) { + if (rand == null) { + rand = new Random(); + } + handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, + ticket); + continue; + } + if (continueSasl) { + // Sasl connect is successful. Let's set up Sasl i/o streams. + inStream = saslRpcClient.getInputStream(inStream); + outStream = saslRpcClient.getOutputStream(outStream); + } else { + // fall back to simple auth because server told us so. + authMethod = AuthMethod.SIMPLE; + header = new ConnectionHeader(header.getProtocol(), header + .getUgi(), authMethod); + useSasl = false; } - })) { - // Sasl connect is successful. Let's set up Sasl i/o streams. - inStream = saslRpcClient.getInputStream(inStream); - outStream = saslRpcClient.getOutputStream(outStream); + } + + if (doPing) { + this.in = new DataInputStream(new BufferedInputStream( + new PingInputStream(inStream))); } else { - // fall back to simple auth because server told us so. - authMethod = AuthMethod.SIMPLE; - header = new ConnectionHeader(header.getProtocol(), - header.getUgi(), authMethod); - useSasl = false; + this.in = new DataInputStream(new BufferedInputStream(inStream)); } - } - if (doPing) { - this.in = new DataInputStream(new BufferedInputStream - (new PingInputStream(inStream))); - } else { - this.in = new DataInputStream(new BufferedInputStream - (inStream)); - } - this.out = new DataOutputStream - (new BufferedOutputStream(outStream)); - writeHeader(); + this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + writeHeader(); - // update last activity time - touch(); + // update last activity time + touch(); - // start the receiver thread after the socket connection has been set up - start(); + // start the receiver thread after the socket connection has been set + // up + start(); + return; + } } catch (IOException e) { markClosed(e); close(); } } + + private void closeConnection() { + if (socket == null) { + return; + } + // close the current connection + try { + socket.close(); + } catch (IOException e) { + LOG.warn("Not able to close a socket", e); + } + // set socket to null so that the next call to setupIOstreams + // can start the process of connect all over again. + socket = null; + } /* Handle connection failures * @@ -513,17 +584,8 @@ public class Client { */ private void handleConnectionFailure( int curRetries, int maxRetries, IOException ioe) throws IOException { - // close the current connection - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - LOG.warn("Not able to close a socket", e); - } - } - // set socket to null so that the next call to setupIOstreams - // can start the process of connect all over again. - socket = null; + + closeConnection(); // throw the exception if the maximum number of retries is reached if (curRetries >= maxRetries) { Modified: hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java?rev=981714&r1=981713&r2=981714&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java Tue Aug 3 00:51:57 2010 @@ -232,6 +232,7 @@ public class UserGroupInformation { // All non-static fields must be read-only caches that come from the subject. private final User user; private final boolean isKeytab; + private final boolean isKrbTkt; private static final String OS_LOGIN_MODULE_NAME; private static final Class<? extends Principal> OS_PRINCIPAL_CLASS; @@ -373,6 +374,15 @@ public class UserGroupInformation { this.subject = subject; this.user = subject.getPrincipals(User.class).iterator().next(); this.isKeytab = !subject.getPrivateCredentials(KerberosKey.class).isEmpty(); + this.isKrbTkt = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty(); + } + + /** + * checks if logged in using kerberos + * @return true if the subject logged via keytab or has a Kerberos TGT + */ + public boolean hasKerberosCredentials() { + return isKeytab || isKrbTkt; } /** @@ -602,7 +612,7 @@ public class UserGroupInformation { throws IOException { if (!isSecurityEnabled() || user.getAuthenticationMethod() != AuthenticationMethod.KERBEROS || - isKeytab) + !isKrbTkt) return; LoginContext login = getLogin(); if (login == null) { @@ -726,10 +736,9 @@ public class UserGroupInformation { /** * Create a proxy user using username of the effective user and the ugi of the * real user. - * - * @param effective - * user, UGI for real user. - * @return + * @param user + * @param realUser + * @return proxyUser ugi */ public static UserGroupInformation createProxyUser(String user, UserGroupInformation realUser) {