Author: omalley
Date: Fri Mar 4 04:17:11 2011
New Revision: 1077459
URL: http://svn.apache.org/viewvc?rev=1077459&view=rev
Log:
commit 3eeb6dcd900c8b46028f00991d608bf5a6a8d01b
Author: Devaraj Das <[email protected]>
Date: Tue May 11 10:56:22 2010 -0700
HADOOP:6706 from
https://issues.apache.org/jira/secure/attachment/12444134/6706-bp20-2.patch
+++ b/YAHOO-CHANGES.txt
+ HADOOP-6706. Fix on top of the earlier patch. Closes the connection
+ on a SASL connection failure, and retries again with a new
+ connection. (ddas)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Client.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Client.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Client.java?rev=1077459&r1=1077458&r2=1077459&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Client.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Client.java
Fri Mar 4 04:17:11 2011
@@ -36,6 +36,7 @@ import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.Random;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -351,53 +352,100 @@ public class Client {
}
}
+ private synchronized boolean shouldAuthenticateOverKrb() throws
IOException {
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ UserGroupInformation currentUser =
+ UserGroupInformation.getCurrentUser();
+ UserGroupInformation realUser = currentUser.getRealUser();
+ if (authMethod == AuthMethod.KERBEROS &&
+ // relogin only in case it is the login user (e.g. JT)
+ // or superuser (like oozie).
+ (currentUser.equals(loginUser) || loginUser.equals(realUser))) {
+ return true;
+ }
+ return false;
+ }
+
private synchronized boolean setupSaslConnection(final InputStream in2,
final OutputStream out2)
throws IOException {
- try {
- saslRpcClient = new SaslRpcClient(authMethod, token,
- serverPrincipal);
- return saslRpcClient.saslConnect(in2, out2);
- } catch (Exception e) {
- LOG.warn("Exception encountered while connecting to the server : " +
- e.getMessage() + ". Will attempt a relogin");
- /*
- * Catch all exceptions here. Most likely we would have hit one of
- * the kerberos exceptions. Just attempt to relogin and try to
- * connect to the server
- */
- UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
- UserGroupInformation currentUser =
- UserGroupInformation.getCurrentUser();
- UserGroupInformation realUser = currentUser.getRealUser();
- if (authMethod == AuthMethod.KERBEROS &&
- // relogin only in case it is the login user (e.g. JT)
- // or superuser (like oozie).
- (currentUser.equals(loginUser) || loginUser.equals(realUser))) {
- //try setting up the connection again
- try {
- //try re-login
- if (UserGroupInformation.isLoginKeytabBased()) {
- loginUser.reloginFromKeytab();
+ saslRpcClient = new SaslRpcClient(authMethod, token, serverPrincipal);
+ return saslRpcClient.saslConnect(in2, out2);
+ }
+
+ private synchronized void setupConnection() throws IOException {
+ short ioFailures = 0;
+ short timeoutFailures = 0;
+ while (true) {
+ try {
+ this.socket = socketFactory.createSocket();
+ this.socket.setTcpNoDelay(tcpNoDelay);
+ // connection time out is 20s
+ NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+ this.socket.setSoTimeout(pingInterval);
+ return;
+ } catch (SocketTimeoutException toe) {
+ /* The max number of retries is 45,
+ * which amounts to 20s*45 = 15 minutes retries.
+ */
+ handleConnectionFailure(timeoutFailures++, 45, toe);
+ } catch (IOException ie) {
+ handleConnectionFailure(ioFailures++, maxRetries, ie);
+ }
+ }
+ }
+ /**
+ * If multiple clients with the same principal try to connect
+ * to the same server at the same time, the server assumes a
+ * replay attack is in progress. This is a feature of kerberos.
+ * In order to work around this, what is done is that the client
+ * backs off randomly and tries to initiate the connection
+ * again.
+ * The other problem is to do with ticket expiry. To handle that,
+ * a relogin is attempted.
+ */
+ private synchronized void handleSaslConnectionFailure(
+ final int currRetries,
+ final int maxRetries, final Exception ex, final Random rand,
+ final UserGroupInformation ugi)
+ throws IOException, InterruptedException{
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException, InterruptedException {
+ final short MAX_BACKOFF = 5000;
+ closeConnection();
+ if (shouldAuthenticateOverKrb()) {
+ if (currRetries < maxRetries) {
+ LOG.debug("Exception encountered while connecting to " +
+ "the server : " + ex);
+ //try re-login
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
+ } else {
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ }
+ disposeSasl();
+ //have granularity of milliseconds
+ //we are sleeping with the Connection lock held but since this
+ //connection instance is being used for connecting to the server
+ //in question, it is okay
+ Thread.sleep((rand.nextInt(MAX_BACKOFF) + 1));
+ return null;
} else {
- loginUser.reloginFromTicketCache();
+ String msg = "Couldn't setup connection for " +
+ UserGroupInformation.getLoginUser().getUserName() +
+ " to " + serverPrincipal;
+ LOG.warn(msg);
+ throw (IOException) new IOException(msg).initCause(ex);
}
- disposeSasl();
- saslRpcClient = new SaslRpcClient(authMethod, token,
- serverPrincipal);
- return saslRpcClient.saslConnect(in2, out2);
- } catch (Exception ex) {
- String msg = "Couldn't setup connection for " +
- loginUser.getUserName() +
- " to " + serverPrincipal + " even after relogin.";
- LOG.warn(msg);
- throw (IOException) new IOException(msg).initCause(ex);
+ } else {
+ LOG.warn("Exception encountered while connecting to " +
+ "the server : " + ex);
}
+ if (ex instanceof RemoteException)
+ throw (RemoteException)ex;
+ throw new IOException(ex);
}
- if (e instanceof RemoteException)
- throw (RemoteException)e;
- throw new IOException(e);
- }
+ });
}
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
@@ -407,80 +455,87 @@ public class Client {
if (socket != null || shouldCloseConnection.get()) {
return;
}
-
- short ioFailures = 0;
- short timeoutFailures = 0;
+
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
}
+ short numRetries = 0;
+ final short MAX_RETRIES = 5;
+ Random rand = null;
while (true) {
- try {
- this.socket = socketFactory.createSocket();
- this.socket.setTcpNoDelay(tcpNoDelay);
- // connection time out is 20s
- NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
- this.socket.setSoTimeout(pingInterval);
- break;
- } catch (SocketTimeoutException toe) {
- /* The max number of retries is 45,
- * which amounts to 20s*45 = 15 minutes retries.
- */
- handleConnectionFailure(timeoutFailures++, 45, toe);
- } catch (IOException ie) {
- handleConnectionFailure(ioFailures++, maxRetries, ie);
- }
- }
- InputStream inStream = NetUtils.getInputStream(socket);
- OutputStream outStream = NetUtils.getOutputStream(socket);
- writeRpcHeader(outStream);
- if (useSasl) {
- final InputStream in2 = inStream;
- final OutputStream out2 = outStream;
- UserGroupInformation ticket = remoteId.getTicket();
- if (authMethod == AuthMethod.KERBEROS) {
- if (ticket.getRealUser() != null) {
- ticket = ticket.getRealUser();
+ setupConnection();
+ InputStream inStream = NetUtils.getInputStream(socket);
+ OutputStream outStream = NetUtils.getOutputStream(socket);
+ writeRpcHeader(outStream);
+ if (useSasl) {
+ final InputStream in2 = inStream;
+ final OutputStream out2 = outStream;
+ UserGroupInformation ticket = remoteId.getTicket();
+ if (authMethod == AuthMethod.KERBEROS) {
+ if (ticket.getRealUser() != null) {
+ ticket = ticket.getRealUser();
+ }
}
- }
- if (ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
- @Override
- public Boolean run() throws IOException {
- try {
- return setupSaslConnection(in2, out2);
- } catch (IOException ie) {
- handleConnectionFailure(1, 1, ie);
- throw ie;
+ boolean continueSasl = false;
+ try {
+ continueSasl =
+ ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws IOException {
+ return setupSaslConnection(in2, out2);
+ }
+ });
+ } catch (Exception ex) {
+ if (rand == null) {
+ rand = new Random();
}
+ handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
+ ticket);
+ continue;
+ }
+ if (continueSasl) {
+ // Sasl connect is successful. Let's set up Sasl i/o streams.
+ inStream = saslRpcClient.getInputStream(inStream);
+ outStream = saslRpcClient.getOutputStream(outStream);
+ } else {
+ // fall back to simple auth because server told us so.
+ authMethod = AuthMethod.SIMPLE;
+ header = new ConnectionHeader(header.getProtocol(),
+ header.getUgi(), authMethod);
+ useSasl = false;
}
- })) {
- // Sasl connect is successful. Let's set up Sasl i/o streams.
- inStream = saslRpcClient.getInputStream(inStream);
- outStream = saslRpcClient.getOutputStream(outStream);
- } else {
- // fall back to simple auth because server told us so.
- authMethod = AuthMethod.SIMPLE;
- header = new ConnectionHeader(header.getProtocol(),
- header.getUgi(), authMethod);
- useSasl = false;
}
+ this.in = new DataInputStream(new BufferedInputStream
+ (new PingInputStream(inStream)));
+ this.out = new DataOutputStream
+ (new BufferedOutputStream(outStream));
+ writeHeader();
+
+ // update last activity time
+ touch();
+
+ // start the receiver thread after the socket connection has been
set up
+ start();
+ return;
}
- this.in = new DataInputStream(new BufferedInputStream
- (new PingInputStream(inStream)));
- this.out = new DataOutputStream
- (new BufferedOutputStream(outStream));
- writeHeader();
-
- // update last activity time
- touch();
-
- // start the receiver thread after the socket connection has been set
up
- start();
} catch (IOException e) {
markClosed(e);
close();
}
}
+
+ private void closeConnection() {
+ // close the current connection
+ try {
+ socket.close();
+ } catch (IOException e) {
+ LOG.warn("Not able to close a socket", e);
+ }
+ // set socket to null so that the next call to setupIOstreams
+ // can start the process of connect all over again.
+ socket = null;
+ }
/* Handle connection failures
*
@@ -498,15 +553,8 @@ public class Client {
*/
private void handleConnectionFailure(
int curRetries, int maxRetries, IOException ioe) throws IOException {
- // close the current connection
- try {
- socket.close();
- } catch (IOException e) {
- LOG.warn("Not able to close a socket", e);
- }
- // set socket to null so that the next call to setupIOstreams
- // can start the process of connect all over again.
- socket = null;
+
+ closeConnection();
// throw the exception if the maximum number of retries is reached
if (curRetries >= maxRetries) {