This is an automated email from the ASF dual-hosted git repository. elserj pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.4 by this push: new fc09cdf HBASE-22492 Wrap RPC responses with SASL after queueing for response (Sébastien Barnoud) fc09cdf is described below commit fc09cdff30bf1b91e19360fcefd7659c2f5aef1c Author: Josh Elser <els...@apache.org> AuthorDate: Fri Jun 21 16:29:08 2019 -0400 HBASE-22492 Wrap RPC responses with SASL after queueing for response (Sébastien Barnoud) Amending-Author: Josh Elser <els...@apache.org> Signed-off-by: Andrew Purtell <apurt...@apache.org> Signed-off-by: Josh Elser <els...@apache.org> --- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 38 ++++++++++++++++------ 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index a32040c..efc7f89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -342,6 +342,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private User user; private InetAddress remoteAddress; + private boolean saslWrapDone; private long responseCellSize = 0; private long responseBlockSize = 0; @@ -369,6 +370,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.tinfo = tinfo; this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH this.remoteAddress = remoteAddress; + this.saslWrapDone = false; this.retryImmediatelySupported = connection == null? null: connection.retryImmediatelySupported; this.timeout = timeout; @@ -478,10 +480,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { byte[] b = createHeaderAndMessageBytes(result, header); bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock); - - if (connection.useWrap) { - bc = wrapWithSasl(bc); - } } catch (IOException e) { LOG.warn("Exception while creating response " + e); } @@ -526,6 +524,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return b; } + private synchronized void wrapWithSasl() throws IOException { + // do it only once per call + if (saslWrapDone) { + return; + } + response = wrapWithSasl(response); + saslWrapDone = true; + } + + /** + * Do not call directly, invoke via {@link #wrapWithSasl()}. + */ private BufferChain wrapWithSasl(BufferChain bc) throws IOException { if (!this.connection.useSasl) return bc; @@ -533,11 +543,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // THIS IS A BIG UGLY COPY. byte [] responseBytes = bc.getBytes(); byte [] token; - // synchronization may be needed since there can be multiple Handler - // threads using saslServer to wrap responses. - synchronized (connection.saslServer) { - token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); - } + + // Previously, synchronization was needed since there could be multiple Handler + // threads using saslServer to wrap responses. However, now we wrap the response + // inside of the Responder thread to avoid sending back mis-ordered SASL messages. + token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); if (LOG.isTraceEnabled()) { LOG.trace("Adding saslServer wrapped token of size " + token.length + " as call response."); @@ -1188,6 +1198,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private boolean processResponse(final Call call) throws IOException { boolean error = true; try { + // Wrap the message "late" in SASL to ensure that the sequence number matches the order of + // responses we write out. + if (call.connection.useWrap) { + call.wrapWithSasl(); + } // Send as much data as we can in the non-blocking fashion long numBytes = channelWrite(call.connection.channel, call.response); if (numBytes < 0) { @@ -1220,6 +1235,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { */ private boolean processAllResponses(final Connection connection) throws IOException { // We want only one writer on the channel for a connection at a time. + boolean isEmpty = false; connection.responseWriteLock.lock(); try { for (int i = 0; i < 20; i++) { @@ -1233,11 +1249,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return false; } } + // Check that state within the lock to be consistent + isEmpty = connection.responseQueue.isEmpty(); } finally { connection.responseWriteLock.unlock(); } - return connection.responseQueue.isEmpty(); + return isEmpty; } //