Author: ddas Date: Fri Feb 19 09:04:47 2010 New Revision: 911748 URL: http://svn.apache.org/viewvc?rev=911748&view=rev Log: HADOOP-6572. Makes sure that SASL encryption and push to responder queue for the RPC response happens atomically. Contributed by Kan Zhang.
Modified: hadoop/common/trunk/CHANGES.txt hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java Modified: hadoop/common/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=911748&r1=911747&r2=911748&view=diff ============================================================================== --- hadoop/common/trunk/CHANGES.txt (original) +++ hadoop/common/trunk/CHANGES.txt Fri Feb 19 09:04:47 2010 @@ -217,6 +217,9 @@ HADOOP-6558. Return null in HarFileSystem.getFileChecksum(..) since no checksum algorithm is implemented. (szetszwo) + HADOOP-6572. Makes sure that SASL encryption and push to responder + queue for the RPC response happens atomically. (Kan Zhang via ddas) + Release 0.21.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=911748&r1=911747&r2=911748&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java Fri Feb 19 09:04:47 2010 @@ -1198,17 +1198,22 @@ error = StringUtils.stringifyException(e); } CurCall.set(null); - setupResponse(buf, call, - (error == null) ? Status.SUCCESS : Status.ERROR, - value, errorClass, error); - // Discard the large buf and reset it back to - // smaller size to freeup heap - if (buf.size() > MAX_RESP_BUF_SIZE) { - LOG.warn("Large response size " + buf.size() + " for call " + - call.toString()); - buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); + synchronized (call.connection.responseQueue) { + // setupResponse() needs to be sync'ed together with + // responder.doResponse() since setupResponse may use + // SASL to encrypt response data and SASL enforces + // its own message ordering. + setupResponse(buf, call, (error == null) ? Status.SUCCESS + : Status.ERROR, value, errorClass, error); + // Discard the large buf and reset it back to + // smaller size to freeup heap + if (buf.size() > MAX_RESP_BUF_SIZE) { + LOG.warn("Large response size " + buf.size() + " for call " + + call.toString()); + buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); + } + responder.doRespond(call); } - responder.doRespond(call); } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " +