http://git-wip-us.apache.org/repos/asf/hbase-site/blob/a8725a46/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClientImpl.Connection.CallSender.html
----------------------------------------------------------------------
diff --git
a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClientImpl.Connection.CallSender.html
b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClientImpl.Connection.CallSender.html
index b59d17d..caf98ca 100644
---
a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClientImpl.Connection.CallSender.html
+++
b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClientImpl.Connection.CallSender.html
@@ -907,428 +907,430 @@
<span class="sourceLineNo">899</span>
cellBlockBuilder.setLength(cellBlock.limit());<a name="line.899"></a>
<span class="sourceLineNo">900</span>
builder.setCellBlockMeta(cellBlockBuilder.build());<a name="line.900"></a>
<span class="sourceLineNo">901</span> }<a name="line.901"></a>
-<span class="sourceLineNo">902</span> // Only pass priority if there one.
Let zero be same as no priority.<a name="line.902"></a>
-<span class="sourceLineNo">903</span> if (priority != 0)
builder.setPriority(priority);<a name="line.903"></a>
-<span class="sourceLineNo">904</span> RequestHeader header =
builder.build();<a name="line.904"></a>
-<span class="sourceLineNo">905</span><a name="line.905"></a>
-<span class="sourceLineNo">906</span> setupIOstreams();<a
name="line.906"></a>
+<span class="sourceLineNo">902</span> // Only pass priority if there is
one set.<a name="line.902"></a>
+<span class="sourceLineNo">903</span> if (priority !=
PayloadCarryingRpcController.PRIORITY_UNSET) {<a name="line.903"></a>
+<span class="sourceLineNo">904</span> builder.setPriority(priority);<a
name="line.904"></a>
+<span class="sourceLineNo">905</span> }<a name="line.905"></a>
+<span class="sourceLineNo">906</span> RequestHeader header =
builder.build();<a name="line.906"></a>
<span class="sourceLineNo">907</span><a name="line.907"></a>
-<span class="sourceLineNo">908</span> // Now we're going to write the
call. We take the lock, then check that the connection<a name="line.908"></a>
-<span class="sourceLineNo">909</span> // is still valid, and, if so we
do the write to the socket. If the write fails, we don't<a name="line.909"></a>
-<span class="sourceLineNo">910</span> // know where we stand, we have to
close the connection.<a name="line.910"></a>
-<span class="sourceLineNo">911</span> checkIsOpen();<a
name="line.911"></a>
-<span class="sourceLineNo">912</span> IOException writeException =
null;<a name="line.912"></a>
-<span class="sourceLineNo">913</span> synchronized (this.outLock) {<a
name="line.913"></a>
-<span class="sourceLineNo">914</span> if (Thread.interrupted()) throw
new InterruptedIOException();<a name="line.914"></a>
-<span class="sourceLineNo">915</span><a name="line.915"></a>
-<span class="sourceLineNo">916</span> calls.put(call.id, call); // We
put first as we don't want the connection to become idle.<a name="line.916"></a>
-<span class="sourceLineNo">917</span> checkIsOpen(); // Now we're
checking that it didn't became idle in between.<a name="line.917"></a>
-<span class="sourceLineNo">918</span><a name="line.918"></a>
-<span class="sourceLineNo">919</span> try {<a name="line.919"></a>
-<span class="sourceLineNo">920</span>
call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header,
call.param,<a name="line.920"></a>
-<span class="sourceLineNo">921</span> cellBlock));<a
name="line.921"></a>
-<span class="sourceLineNo">922</span> } catch (IOException e) {<a
name="line.922"></a>
-<span class="sourceLineNo">923</span> // We set the value inside the
synchronized block, this way the next in line<a name="line.923"></a>
-<span class="sourceLineNo">924</span> // won't even try to write.
Otherwise we might miss a call in the calls map?<a name="line.924"></a>
-<span class="sourceLineNo">925</span>
shouldCloseConnection.set(true);<a name="line.925"></a>
-<span class="sourceLineNo">926</span> writeException = e;<a
name="line.926"></a>
-<span class="sourceLineNo">927</span> interrupt();<a
name="line.927"></a>
-<span class="sourceLineNo">928</span> }<a name="line.928"></a>
-<span class="sourceLineNo">929</span> }<a name="line.929"></a>
-<span class="sourceLineNo">930</span><a name="line.930"></a>
-<span class="sourceLineNo">931</span> // call close outside of the
synchronized (outLock) to prevent deadlock - HBASE-14474<a name="line.931"></a>
-<span class="sourceLineNo">932</span> if (writeException != null) {<a
name="line.932"></a>
-<span class="sourceLineNo">933</span> markClosed(writeException);<a
name="line.933"></a>
-<span class="sourceLineNo">934</span> close();<a name="line.934"></a>
-<span class="sourceLineNo">935</span> }<a name="line.935"></a>
-<span class="sourceLineNo">936</span><a name="line.936"></a>
-<span class="sourceLineNo">937</span> // We added a call, and may be
started the connection close. In both cases, we<a name="line.937"></a>
-<span class="sourceLineNo">938</span> // need to notify the reader.<a
name="line.938"></a>
-<span class="sourceLineNo">939</span> doNotify();<a name="line.939"></a>
-<span class="sourceLineNo">940</span><a name="line.940"></a>
-<span class="sourceLineNo">941</span> // Now that we notified, we can
rethrow the exception if any. Otherwise we're good.<a name="line.941"></a>
-<span class="sourceLineNo">942</span> if (writeException != null) throw
writeException;<a name="line.942"></a>
-<span class="sourceLineNo">943</span> }<a name="line.943"></a>
-<span class="sourceLineNo">944</span><a name="line.944"></a>
-<span class="sourceLineNo">945</span>
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",<a
name="line.945"></a>
-<span class="sourceLineNo">946</span> justification="Presume notifyAll
is because we are closing/shutting down")<a name="line.946"></a>
-<span class="sourceLineNo">947</span> private synchronized void doNotify()
{<a name="line.947"></a>
-<span class="sourceLineNo">948</span> // Make a separate method so can do
synchronize and add findbugs annotation; only one<a name="line.948"></a>
-<span class="sourceLineNo">949</span> // annotation at at time in source
1.7.<a name="line.949"></a>
-<span class="sourceLineNo">950</span> notifyAll(); // Findbugs:
NN_NAKED_NOTIFY<a name="line.950"></a>
-<span class="sourceLineNo">951</span> }<a name="line.951"></a>
-<span class="sourceLineNo">952</span><a name="line.952"></a>
-<span class="sourceLineNo">953</span> /* Receive a response.<a
name="line.953"></a>
-<span class="sourceLineNo">954</span> * Because only one receiver, so no
synchronization on in.<a name="line.954"></a>
-<span class="sourceLineNo">955</span> */<a name="line.955"></a>
-<span class="sourceLineNo">956</span> protected void readResponse() {<a
name="line.956"></a>
-<span class="sourceLineNo">957</span> if (shouldCloseConnection.get())
return;<a name="line.957"></a>
-<span class="sourceLineNo">958</span> Call call = null;<a
name="line.958"></a>
-<span class="sourceLineNo">959</span> boolean expectedCall = false;<a
name="line.959"></a>
-<span class="sourceLineNo">960</span> try {<a name="line.960"></a>
-<span class="sourceLineNo">961</span> // See
HBaseServer.Call.setResponse for where we write out the response.<a
name="line.961"></a>
-<span class="sourceLineNo">962</span> // Total size of the response.
Unused. But have to read it in anyways.<a name="line.962"></a>
-<span class="sourceLineNo">963</span> int totalSize = in.readInt();<a
name="line.963"></a>
-<span class="sourceLineNo">964</span><a name="line.964"></a>
-<span class="sourceLineNo">965</span> // Read the header<a
name="line.965"></a>
-<span class="sourceLineNo">966</span> ResponseHeader responseHeader =
ResponseHeader.parseDelimitedFrom(in);<a name="line.966"></a>
-<span class="sourceLineNo">967</span> int id =
responseHeader.getCallId();<a name="line.967"></a>
-<span class="sourceLineNo">968</span> call = calls.remove(id); //
call.done have to be set before leaving this method<a name="line.968"></a>
-<span class="sourceLineNo">969</span> expectedCall = (call != null
&& !call.done);<a name="line.969"></a>
-<span class="sourceLineNo">970</span> if (!expectedCall) {<a
name="line.970"></a>
-<span class="sourceLineNo">971</span> // So we got a response for
which we have no corresponding 'call' here on the client-side.<a
name="line.971"></a>
-<span class="sourceLineNo">972</span> // We probably timed out
waiting, cleaned up all references, and now the server decides<a
name="line.972"></a>
-<span class="sourceLineNo">973</span> // to return a response. There
is nothing we can do w/ the response at this stage. Clean<a name="line.973"></a>
-<span class="sourceLineNo">974</span> // out the wire of the response
so its out of the way and we can get other responses on<a name="line.974"></a>
-<span class="sourceLineNo">975</span> // this connection.<a
name="line.975"></a>
-<span class="sourceLineNo">976</span> int readSoFar =
IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);<a name="line.976"></a>
-<span class="sourceLineNo">977</span> int whatIsLeftToRead =
totalSize - readSoFar;<a name="line.977"></a>
-<span class="sourceLineNo">978</span> IOUtils.skipFully(in,
whatIsLeftToRead);<a name="line.978"></a>
-<span class="sourceLineNo">979</span> if (call != null) {<a
name="line.979"></a>
-<span class="sourceLineNo">980</span>
call.callStats.setResponseSizeBytes(totalSize);<a name="line.980"></a>
-<span class="sourceLineNo">981</span>
call.callStats.setCallTimeMs(<a name="line.981"></a>
-<span class="sourceLineNo">982</span>
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a
name="line.982"></a>
-<span class="sourceLineNo">983</span> }<a name="line.983"></a>
-<span class="sourceLineNo">984</span> return;<a name="line.984"></a>
-<span class="sourceLineNo">985</span> }<a name="line.985"></a>
-<span class="sourceLineNo">986</span> if
(responseHeader.hasException()) {<a name="line.986"></a>
-<span class="sourceLineNo">987</span> ExceptionResponse
exceptionResponse = responseHeader.getException();<a name="line.987"></a>
-<span class="sourceLineNo">988</span> RemoteException re =
createRemoteException(exceptionResponse);<a name="line.988"></a>
-<span class="sourceLineNo">989</span> call.setException(re);<a
name="line.989"></a>
-<span class="sourceLineNo">990</span>
call.callStats.setResponseSizeBytes(totalSize);<a name="line.990"></a>
-<span class="sourceLineNo">991</span> call.callStats.setCallTimeMs(<a
name="line.991"></a>
-<span class="sourceLineNo">992</span>
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a
name="line.992"></a>
-<span class="sourceLineNo">993</span> if
(isFatalConnectionException(exceptionResponse)) {<a name="line.993"></a>
-<span class="sourceLineNo">994</span> markClosed(re);<a
name="line.994"></a>
-<span class="sourceLineNo">995</span> }<a name="line.995"></a>
-<span class="sourceLineNo">996</span> } else {<a name="line.996"></a>
-<span class="sourceLineNo">997</span> Message value = null;<a
name="line.997"></a>
-<span class="sourceLineNo">998</span> if (call.responseDefaultType !=
null) {<a name="line.998"></a>
-<span class="sourceLineNo">999</span> Builder builder =
call.responseDefaultType.newBuilderForType();<a name="line.999"></a>
-<span class="sourceLineNo">1000</span>
ProtobufUtil.mergeDelimitedFrom(builder, in);<a name="line.1000"></a>
-<span class="sourceLineNo">1001</span> value = builder.build();<a
name="line.1001"></a>
-<span class="sourceLineNo">1002</span> }<a name="line.1002"></a>
-<span class="sourceLineNo">1003</span> CellScanner cellBlockScanner =
null;<a name="line.1003"></a>
-<span class="sourceLineNo">1004</span> if
(responseHeader.hasCellBlockMeta()) {<a name="line.1004"></a>
-<span class="sourceLineNo">1005</span> int size =
responseHeader.getCellBlockMeta().getLength();<a name="line.1005"></a>
-<span class="sourceLineNo">1006</span> byte [] cellBlock = new
byte[size];<a name="line.1006"></a>
-<span class="sourceLineNo">1007</span> IOUtils.readFully(this.in,
cellBlock, 0, cellBlock.length);<a name="line.1007"></a>
-<span class="sourceLineNo">1008</span> cellBlockScanner =
ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);<a
name="line.1008"></a>
-<span class="sourceLineNo">1009</span> }<a name="line.1009"></a>
-<span class="sourceLineNo">1010</span> call.setResponse(value,
cellBlockScanner);<a name="line.1010"></a>
-<span class="sourceLineNo">1011</span>
call.callStats.setResponseSizeBytes(totalSize);<a name="line.1011"></a>
-<span class="sourceLineNo">1012</span>
call.callStats.setCallTimeMs(<a name="line.1012"></a>
-<span class="sourceLineNo">1013</span>
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a
name="line.1013"></a>
-<span class="sourceLineNo">1014</span> }<a name="line.1014"></a>
-<span class="sourceLineNo">1015</span> } catch (IOException e) {<a
name="line.1015"></a>
-<span class="sourceLineNo">1016</span> if (expectedCall)
call.setException(e);<a name="line.1016"></a>
-<span class="sourceLineNo">1017</span> if (e instanceof
SocketTimeoutException) {<a name="line.1017"></a>
-<span class="sourceLineNo">1018</span> // Clean up open calls but
don't treat this as a fatal condition,<a name="line.1018"></a>
-<span class="sourceLineNo">1019</span> // since we expect certain
responses to not make it by the specified<a name="line.1019"></a>
-<span class="sourceLineNo">1020</span> // {@link
ConnectionId#rpcTimeout}.<a name="line.1020"></a>
-<span class="sourceLineNo">1021</span> if (LOG.isTraceEnabled())
LOG.trace("ignored", e);<a name="line.1021"></a>
-<span class="sourceLineNo">1022</span> } else {<a name="line.1022"></a>
-<span class="sourceLineNo">1023</span> // Treat this as a fatal
condition and close this connection<a name="line.1023"></a>
-<span class="sourceLineNo">1024</span> markClosed(e);<a
name="line.1024"></a>
-<span class="sourceLineNo">1025</span> }<a name="line.1025"></a>
-<span class="sourceLineNo">1026</span> } finally {<a name="line.1026"></a>
-<span class="sourceLineNo">1027</span> cleanupCalls(false);<a
name="line.1027"></a>
-<span class="sourceLineNo">1028</span> }<a name="line.1028"></a>
-<span class="sourceLineNo">1029</span> }<a name="line.1029"></a>
-<span class="sourceLineNo">1030</span><a name="line.1030"></a>
-<span class="sourceLineNo">1031</span> /**<a name="line.1031"></a>
-<span class="sourceLineNo">1032</span> * @return True if the exception is
a fatal connection exception.<a name="line.1032"></a>
-<span class="sourceLineNo">1033</span> */<a name="line.1033"></a>
-<span class="sourceLineNo">1034</span> private boolean
isFatalConnectionException(final ExceptionResponse e) {<a name="line.1034"></a>
-<span class="sourceLineNo">1035</span> return
e.getExceptionClassName().<a name="line.1035"></a>
-<span class="sourceLineNo">1036</span>
equals(FatalConnectionException.class.getName());<a name="line.1036"></a>
-<span class="sourceLineNo">1037</span> }<a name="line.1037"></a>
-<span class="sourceLineNo">1038</span><a name="line.1038"></a>
-<span class="sourceLineNo">1039</span> /**<a name="line.1039"></a>
-<span class="sourceLineNo">1040</span> * @param e exception to be
wrapped<a name="line.1040"></a>
-<span class="sourceLineNo">1041</span> * @return RemoteException made from
passed <code>e</code><a name="line.1041"></a>
-<span class="sourceLineNo">1042</span> */<a name="line.1042"></a>
-<span class="sourceLineNo">1043</span> private RemoteException
createRemoteException(final ExceptionResponse e) {<a name="line.1043"></a>
-<span class="sourceLineNo">1044</span> String innerExceptionClassName =
e.getExceptionClassName();<a name="line.1044"></a>
-<span class="sourceLineNo">1045</span> boolean doNotRetry =
e.getDoNotRetry();<a name="line.1045"></a>
-<span class="sourceLineNo">1046</span> return e.hasHostname()?<a
name="line.1046"></a>
-<span class="sourceLineNo">1047</span> // If a hostname then add it to
the RemoteWithExtrasException<a name="line.1047"></a>
-<span class="sourceLineNo">1048</span> new
RemoteWithExtrasException(innerExceptionClassName,<a name="line.1048"></a>
-<span class="sourceLineNo">1049</span> e.getStackTrace(),
e.getHostname(), e.getPort(), doNotRetry):<a name="line.1049"></a>
+<span class="sourceLineNo">908</span> setupIOstreams();<a
name="line.908"></a>
+<span class="sourceLineNo">909</span><a name="line.909"></a>
+<span class="sourceLineNo">910</span> // Now we're going to write the
call. We take the lock, then check that the connection<a name="line.910"></a>
+<span class="sourceLineNo">911</span> // is still valid, and, if so we
do the write to the socket. If the write fails, we don't<a name="line.911"></a>
+<span class="sourceLineNo">912</span> // know where we stand, we have to
close the connection.<a name="line.912"></a>
+<span class="sourceLineNo">913</span> checkIsOpen();<a
name="line.913"></a>
+<span class="sourceLineNo">914</span> IOException writeException =
null;<a name="line.914"></a>
+<span class="sourceLineNo">915</span> synchronized (this.outLock) {<a
name="line.915"></a>
+<span class="sourceLineNo">916</span> if (Thread.interrupted()) throw
new InterruptedIOException();<a name="line.916"></a>
+<span class="sourceLineNo">917</span><a name="line.917"></a>
+<span class="sourceLineNo">918</span> calls.put(call.id, call); // We
put first as we don't want the connection to become idle.<a name="line.918"></a>
+<span class="sourceLineNo">919</span> checkIsOpen(); // Now we're
checking that it didn't became idle in between.<a name="line.919"></a>
+<span class="sourceLineNo">920</span><a name="line.920"></a>
+<span class="sourceLineNo">921</span> try {<a name="line.921"></a>
+<span class="sourceLineNo">922</span>
call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header,
call.param,<a name="line.922"></a>
+<span class="sourceLineNo">923</span> cellBlock));<a
name="line.923"></a>
+<span class="sourceLineNo">924</span> } catch (IOException e) {<a
name="line.924"></a>
+<span class="sourceLineNo">925</span> // We set the value inside the
synchronized block, this way the next in line<a name="line.925"></a>
+<span class="sourceLineNo">926</span> // won't even try to write.
Otherwise we might miss a call in the calls map?<a name="line.926"></a>
+<span class="sourceLineNo">927</span>
shouldCloseConnection.set(true);<a name="line.927"></a>
+<span class="sourceLineNo">928</span> writeException = e;<a
name="line.928"></a>
+<span class="sourceLineNo">929</span> interrupt();<a
name="line.929"></a>
+<span class="sourceLineNo">930</span> }<a name="line.930"></a>
+<span class="sourceLineNo">931</span> }<a name="line.931"></a>
+<span class="sourceLineNo">932</span><a name="line.932"></a>
+<span class="sourceLineNo">933</span> // call close outside of the
synchronized (outLock) to prevent deadlock - HBASE-14474<a name="line.933"></a>
+<span class="sourceLineNo">934</span> if (writeException != null) {<a
name="line.934"></a>
+<span class="sourceLineNo">935</span> markClosed(writeException);<a
name="line.935"></a>
+<span class="sourceLineNo">936</span> close();<a name="line.936"></a>
+<span class="sourceLineNo">937</span> }<a name="line.937"></a>
+<span class="sourceLineNo">938</span><a name="line.938"></a>
+<span class="sourceLineNo">939</span> // We added a call, and may be
started the connection close. In both cases, we<a name="line.939"></a>
+<span class="sourceLineNo">940</span> // need to notify the reader.<a
name="line.940"></a>
+<span class="sourceLineNo">941</span> doNotify();<a name="line.941"></a>
+<span class="sourceLineNo">942</span><a name="line.942"></a>
+<span class="sourceLineNo">943</span> // Now that we notified, we can
rethrow the exception if any. Otherwise we're good.<a name="line.943"></a>
+<span class="sourceLineNo">944</span> if (writeException != null) throw
writeException;<a name="line.944"></a>
+<span class="sourceLineNo">945</span> }<a name="line.945"></a>
+<span class="sourceLineNo">946</span><a name="line.946"></a>
+<span class="sourceLineNo">947</span>
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",<a
name="line.947"></a>
+<span class="sourceLineNo">948</span> justification="Presume notifyAll
is because we are closing/shutting down")<a name="line.948"></a>
+<span class="sourceLineNo">949</span> private synchronized void doNotify()
{<a name="line.949"></a>
+<span class="sourceLineNo">950</span> // Make a separate method so can do
synchronize and add findbugs annotation; only one<a name="line.950"></a>
+<span class="sourceLineNo">951</span> // annotation at at time in source
1.7.<a name="line.951"></a>
+<span class="sourceLineNo">952</span> notifyAll(); // Findbugs:
NN_NAKED_NOTIFY<a name="line.952"></a>
+<span class="sourceLineNo">953</span> }<a name="line.953"></a>
+<span class="sourceLineNo">954</span><a name="line.954"></a>
+<span class="sourceLineNo">955</span> /* Receive a response.<a
name="line.955"></a>
+<span class="sourceLineNo">956</span> * Because only one receiver, so no
synchronization on in.<a name="line.956"></a>
+<span class="sourceLineNo">957</span> */<a name="line.957"></a>
+<span class="sourceLineNo">958</span> protected void readResponse() {<a
name="line.958"></a>
+<span class="sourceLineNo">959</span> if (shouldCloseConnection.get())
return;<a name="line.959"></a>
+<span class="sourceLineNo">960</span> Call call = null;<a
name="line.960"></a>
+<span class="sourceLineNo">961</span> boolean expectedCall = false;<a
name="line.961"></a>
+<span class="sourceLineNo">962</span> try {<a name="line.962"></a>
+<span class="sourceLineNo">963</span> // See
HBaseServer.Call.setResponse for where we write out the response.<a
name="line.963"></a>
+<span class="sourceLineNo">964</span> // Total size of the response.
Unused. But have to read it in anyways.<a name="line.964"></a>
+<span class="sourceLineNo">965</span> int totalSize = in.readInt();<a
name="line.965"></a>
+<span class="sourceLineNo">966</span><a name="line.966"></a>
+<span class="sourceLineNo">967</span> // Read the header<a
name="line.967"></a>
+<span class="sourceLineNo">968</span> ResponseHeader responseHeader =
ResponseHeader.parseDelimitedFrom(in);<a name="line.968"></a>
+<span class="sourceLineNo">969</span> int id =
responseHeader.getCallId();<a name="line.969"></a>
+<span class="sourceLineNo">970</span> call = calls.remove(id); //
call.done have to be set before leaving this method<a name="line.970"></a>
+<span class="sourceLineNo">971</span> expectedCall = (call != null
&& !call.done);<a name="line.971"></a>
+<span class="sourceLineNo">972</span> if (!expectedCall) {<a
name="line.972"></a>
+<span class="sourceLineNo">973</span> // So we got a response for
which we have no corresponding 'call' here on the client-side.<a
name="line.973"></a>
+<span class="sourceLineNo">974</span> // We probably timed out
waiting, cleaned up all references, and now the server decides<a
name="line.974"></a>
+<span class="sourceLineNo">975</span> // to return a response. There
is nothing we can do w/ the response at this stage. Clean<a name="line.975"></a>
+<span class="sourceLineNo">976</span> // out the wire of the response
so its out of the way and we can get other responses on<a name="line.976"></a>
+<span class="sourceLineNo">977</span> // this connection.<a
name="line.977"></a>
+<span class="sourceLineNo">978</span> int readSoFar =
IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);<a name="line.978"></a>
+<span class="sourceLineNo">979</span> int whatIsLeftToRead =
totalSize - readSoFar;<a name="line.979"></a>
+<span class="sourceLineNo">980</span> IOUtils.skipFully(in,
whatIsLeftToRead);<a name="line.980"></a>
+<span class="sourceLineNo">981</span> if (call != null) {<a
name="line.981"></a>
+<span class="sourceLineNo">982</span>
call.callStats.setResponseSizeBytes(totalSize);<a name="line.982"></a>
+<span class="sourceLineNo">983</span>
call.callStats.setCallTimeMs(<a name="line.983"></a>
+<span class="sourceLineNo">984</span>
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a
name="line.984"></a>
+<span class="sourceLineNo">985</span> }<a name="line.985"></a>
+<span class="sourceLineNo">986</span> return;<a name="line.986"></a>
+<span class="sourceLineNo">987</span> }<a name="line.987"></a>
+<span class="sourceLineNo">988</span> if
(responseHeader.hasException()) {<a name="line.988"></a>
+<span class="sourceLineNo">989</span> ExceptionResponse
exceptionResponse = responseHeader.getException();<a name="line.989"></a>
+<span class="sourceLineNo">990</span> RemoteException re =
createRemoteException(exceptionResponse);<a name="line.990"></a>
+<span class="sourceLineNo">991</span> call.setException(re);<a
name="line.991"></a>
+<span class="sourceLineNo">992</span>
call.callStats.setResponseSizeBytes(totalSize);<a name="line.992"></a>
+<span class="sourceLineNo">993</span> call.callStats.setCallTimeMs(<a
name="line.993"></a>
+<span class="sourceLineNo">994</span>
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a
name="line.994"></a>
+<span class="sourceLineNo">995</span> if
(isFatalConnectionException(exceptionResponse)) {<a name="line.995"></a>
+<span class="sourceLineNo">996</span> markClosed(re);<a
name="line.996"></a>
+<span class="sourceLineNo">997</span> }<a name="line.997"></a>
+<span class="sourceLineNo">998</span> } else {<a name="line.998"></a>
+<span class="sourceLineNo">999</span> Message value = null;<a
name="line.999"></a>
+<span class="sourceLineNo">1000</span> if (call.responseDefaultType
!= null) {<a name="line.1000"></a>
+<span class="sourceLineNo">1001</span> Builder builder =
call.responseDefaultType.newBuilderForType();<a name="line.1001"></a>
+<span class="sourceLineNo">1002</span>
ProtobufUtil.mergeDelimitedFrom(builder, in);<a name="line.1002"></a>
+<span class="sourceLineNo">1003</span> value = builder.build();<a
name="line.1003"></a>
+<span class="sourceLineNo">1004</span> }<a name="line.1004"></a>
+<span class="sourceLineNo">1005</span> CellScanner cellBlockScanner =
null;<a name="line.1005"></a>
+<span class="sourceLineNo">1006</span> if
(responseHeader.hasCellBlockMeta()) {<a name="line.1006"></a>
+<span class="sourceLineNo">1007</span> int size =
responseHeader.getCellBlockMeta().getLength();<a name="line.1007"></a>
+<span class="sourceLineNo">1008</span> byte [] cellBlock = new
byte[size];<a name="line.1008"></a>
+<span class="sourceLineNo">1009</span> IOUtils.readFully(this.in,
cellBlock, 0, cellBlock.length);<a name="line.1009"></a>
+<span class="sourceLineNo">1010</span> cellBlockScanner =
ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);<a
name="line.1010"></a>
+<span class="sourceLineNo">1011</span> }<a name="line.1011"></a>
+<span class="sourceLineNo">1012</span> call.setResponse(value,
cellBlockScanner);<a name="line.1012"></a>
+<span class="sourceLineNo">1013</span>
call.callStats.setResponseSizeBytes(totalSize);<a name="line.1013"></a>
+<span class="sourceLineNo">1014</span>
call.callStats.setCallTimeMs(<a name="line.1014"></a>
+<span class="sourceLineNo">1015</span>
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a
name="line.1015"></a>
+<span class="sourceLineNo">1016</span> }<a name="line.1016"></a>
+<span class="sourceLineNo">1017</span> } catch (IOException e) {<a
name="line.1017"></a>
+<span class="sourceLineNo">1018</span> if (expectedCall)
call.setException(e);<a name="line.1018"></a>
+<span class="sourceLineNo">1019</span> if (e instanceof
SocketTimeoutException) {<a name="line.1019"></a>
+<span class="sourceLineNo">1020</span> // Clean up open calls but
don't treat this as a fatal condition,<a name="line.1020"></a>
+<span class="sourceLineNo">1021</span> // since we expect certain
responses to not make it by the specified<a name="line.1021"></a>
+<span class="sourceLineNo">1022</span> // {@link
ConnectionId#rpcTimeout}.<a name="line.1022"></a>
+<span class="sourceLineNo">1023</span> if (LOG.isTraceEnabled())
LOG.trace("ignored", e);<a name="line.1023"></a>
+<span class="sourceLineNo">1024</span> } else {<a name="line.1024"></a>
+<span class="sourceLineNo">1025</span> // Treat this as a fatal
condition and close this connection<a name="line.1025"></a>
+<span class="sourceLineNo">1026</span> markClosed(e);<a
name="line.1026"></a>
+<span class="sourceLineNo">1027</span> }<a name="line.1027"></a>
+<span class="sourceLineNo">1028</span> } finally {<a name="line.1028"></a>
+<span class="sourceLineNo">1029</span> cleanupCalls(false);<a
name="line.1029"></a>
+<span class="sourceLineNo">1030</span> }<a name="line.1030"></a>
+<span class="sourceLineNo">1031</span> }<a name="line.1031"></a>
+<span class="sourceLineNo">1032</span><a name="line.1032"></a>
+<span class="sourceLineNo">1033</span> /**<a name="line.1033"></a>
+<span class="sourceLineNo">1034</span> * @return True if the exception is
a fatal connection exception.<a name="line.1034"></a>
+<span class="sourceLineNo">1035</span> */<a name="line.1035"></a>
+<span class="sourceLineNo">1036</span> private boolean
isFatalConnectionException(final ExceptionResponse e) {<a name="line.1036"></a>
+<span class="sourceLineNo">1037</span> return
e.getExceptionClassName().<a name="line.1037"></a>
+<span class="sourceLineNo">1038</span>
equals(FatalConnectionException.class.getName());<a name="line.1038"></a>
+<span class="sourceLineNo">1039</span> }<a name="line.1039"></a>
+<span class="sourceLineNo">1040</span><a name="line.1040"></a>
+<span class="sourceLineNo">1041</span> /**<a name="line.1041"></a>
+<span class="sourceLineNo">1042</span> * @param e exception to be
wrapped<a name="line.1042"></a>
+<span class="sourceLineNo">1043</span> * @return RemoteException made from
passed <code>e</code><a name="line.1043"></a>
+<span class="sourceLineNo">1044</span> */<a name="line.1044"></a>
+<span class="sourceLineNo">1045</span> private RemoteException
createRemoteException(final ExceptionResponse e) {<a name="line.1045"></a>
+<span class="sourceLineNo">1046</span> String innerExceptionClassName =
e.getExceptionClassName();<a name="line.1046"></a>
+<span class="sourceLineNo">1047</span> boolean doNotRetry =
e.getDoNotRetry();<a name="line.1047"></a>
+<span class="sourceLineNo">1048</span> return e.hasHostname()?<a
name="line.1048"></a>
+<span class="sourceLineNo">1049</span> // If a hostname then add it to
the RemoteWithExtrasException<a name="line.1049"></a>
<span class="sourceLineNo">1050</span> new
RemoteWithExtrasException(innerExceptionClassName,<a name="line.1050"></a>
-<span class="sourceLineNo">1051</span> e.getStackTrace(),
doNotRetry);<a name="line.1051"></a>
-<span class="sourceLineNo">1052</span> }<a name="line.1052"></a>
-<span class="sourceLineNo">1053</span><a name="line.1053"></a>
-<span class="sourceLineNo">1054</span> protected synchronized boolean
markClosed(IOException e) {<a name="line.1054"></a>
-<span class="sourceLineNo">1055</span> if (e == null) throw new
NullPointerException();<a name="line.1055"></a>
-<span class="sourceLineNo">1056</span><a name="line.1056"></a>
-<span class="sourceLineNo">1057</span> boolean ret =
shouldCloseConnection.compareAndSet(false, true);<a name="line.1057"></a>
-<span class="sourceLineNo">1058</span> if (ret) {<a name="line.1058"></a>
-<span class="sourceLineNo">1059</span> if (LOG.isTraceEnabled()) {<a
name="line.1059"></a>
-<span class="sourceLineNo">1060</span> LOG.trace(getName() + ":
marking at should close, reason: " + e.getMessage());<a name="line.1060"></a>
-<span class="sourceLineNo">1061</span> }<a name="line.1061"></a>
-<span class="sourceLineNo">1062</span> if (callSender != null) {<a
name="line.1062"></a>
-<span class="sourceLineNo">1063</span> callSender.close();<a
name="line.1063"></a>
-<span class="sourceLineNo">1064</span> }<a name="line.1064"></a>
-<span class="sourceLineNo">1065</span> notifyAll();<a
name="line.1065"></a>
-<span class="sourceLineNo">1066</span> }<a name="line.1066"></a>
-<span class="sourceLineNo">1067</span> return ret;<a name="line.1067"></a>
-<span class="sourceLineNo">1068</span> }<a name="line.1068"></a>
-<span class="sourceLineNo">1069</span><a name="line.1069"></a>
-<span class="sourceLineNo">1070</span><a name="line.1070"></a>
-<span class="sourceLineNo">1071</span> /**<a name="line.1071"></a>
-<span class="sourceLineNo">1072</span> * Cleanup the calls older than a
given timeout, in milli seconds.<a name="line.1072"></a>
-<span class="sourceLineNo">1073</span> * @param allCalls true for all
calls, false for only the calls in timeout<a name="line.1073"></a>
-<span class="sourceLineNo">1074</span> */<a name="line.1074"></a>
-<span class="sourceLineNo">1075</span> protected synchronized void
cleanupCalls(boolean allCalls) {<a name="line.1075"></a>
-<span class="sourceLineNo">1076</span> Iterator<Entry<Integer,
Call>> itor = calls.entrySet().iterator();<a name="line.1076"></a>
-<span class="sourceLineNo">1077</span> while (itor.hasNext()) {<a
name="line.1077"></a>
-<span class="sourceLineNo">1078</span> Call c =
itor.next().getValue();<a name="line.1078"></a>
-<span class="sourceLineNo">1079</span> if (c.done) {<a
name="line.1079"></a>
-<span class="sourceLineNo">1080</span> // To catch the calls without
timeout that were cancelled.<a name="line.1080"></a>
-<span class="sourceLineNo">1081</span> itor.remove();<a
name="line.1081"></a>
-<span class="sourceLineNo">1082</span> } else if (allCalls) {<a
name="line.1082"></a>
-<span class="sourceLineNo">1083</span> long waitTime =
EnvironmentEdgeManager.currentTime() - c.getStartTime();<a name="line.1083"></a>
-<span class="sourceLineNo">1084</span> IOException ie = new
ConnectionClosingException("Connection to " + getRemoteAddress()<a
name="line.1084"></a>
-<span class="sourceLineNo">1085</span> + " is closing. Call id="
+ c.id + ", waitTime=" + waitTime);<a name="line.1085"></a>
-<span class="sourceLineNo">1086</span> c.setException(ie);<a
name="line.1086"></a>
-<span class="sourceLineNo">1087</span> itor.remove();<a
name="line.1087"></a>
-<span class="sourceLineNo">1088</span> } else if
(c.checkAndSetTimeout()) {<a name="line.1088"></a>
+<span class="sourceLineNo">1051</span> e.getStackTrace(),
e.getHostname(), e.getPort(), doNotRetry):<a name="line.1051"></a>
+<span class="sourceLineNo">1052</span> new
RemoteWithExtrasException(innerExceptionClassName,<a name="line.1052"></a>
+<span class="sourceLineNo">1053</span> e.getStackTrace(),
doNotRetry);<a name="line.1053"></a>
+<span class="sourceLineNo">1054</span> }<a name="line.1054"></a>
+<span class="sourceLineNo">1055</span><a name="line.1055"></a>
+<span class="sourceLineNo">1056</span> protected synchronized boolean
markClosed(IOException e) {<a name="line.1056"></a>
+<span class="sourceLineNo">1057</span> if (e == null) throw new
NullPointerException();<a name="line.1057"></a>
+<span class="sourceLineNo">1058</span><a name="line.1058"></a>
+<span class="sourceLineNo">1059</span> boolean ret =
shouldCloseConnection.compareAndSet(false, true);<a name="line.1059"></a>
+<span class="sourceLineNo">1060</span> if (ret) {<a name="line.1060"></a>
+<span class="sourceLineNo">1061</span> if (LOG.isTraceEnabled()) {<a
name="line.1061"></a>
+<span class="sourceLineNo">1062</span> LOG.trace(getName() + ":
marking at should close, reason: " + e.getMessage());<a name="line.1062"></a>
+<span class="sourceLineNo">1063</span> }<a name="line.1063"></a>
+<span class="sourceLineNo">1064</span> if (callSender != null) {<a
name="line.1064"></a>
+<span class="sourceLineNo">1065</span> callSender.close();<a
name="line.1065"></a>
+<span class="sourceLineNo">1066</span> }<a name="line.1066"></a>
+<span class="sourceLineNo">1067</span> notifyAll();<a
name="line.1067"></a>
+<span class="sourceLineNo">1068</span> }<a name="line.1068"></a>
+<span class="sourceLineNo">1069</span> return ret;<a name="line.1069"></a>
+<span class="sourceLineNo">1070</span> }<a name="line.1070"></a>
+<span class="sourceLineNo">1071</span><a name="line.1071"></a>
+<span class="sourceLineNo">1072</span><a name="line.1072"></a>
+<span class="sourceLineNo">1073</span> /**<a name="line.1073"></a>
+<span class="sourceLineNo">1074</span> * Cleanup the calls older than a
given timeout, in milli seconds.<a name="line.1074"></a>
+<span class="sourceLineNo">1075</span> * @param allCalls true for all
calls, false for only the calls in timeout<a name="line.1075"></a>
+<span class="sourceLineNo">1076</span> */<a name="line.1076"></a>
+<span class="sourceLineNo">1077</span> protected synchronized void
cleanupCalls(boolean allCalls) {<a name="line.1077"></a>
+<span class="sourceLineNo">1078</span> Iterator<Entry<Integer,
Call>> itor = calls.entrySet().iterator();<a name="line.1078"></a>
+<span class="sourceLineNo">1079</span> while (itor.hasNext()) {<a
name="line.1079"></a>
+<span class="sourceLineNo">1080</span> Call c =
itor.next().getValue();<a name="line.1080"></a>
+<span class="sourceLineNo">1081</span> if (c.done) {<a
name="line.1081"></a>
+<span class="sourceLineNo">1082</span> // To catch the calls without
timeout that were cancelled.<a name="line.1082"></a>
+<span class="sourceLineNo">1083</span> itor.remove();<a
name="line.1083"></a>
+<span class="sourceLineNo">1084</span> } else if (allCalls) {<a
name="line.1084"></a>
+<span class="sourceLineNo">1085</span> long waitTime =
EnvironmentEdgeManager.currentTime() - c.getStartTime();<a name="line.1085"></a>
+<span class="sourceLineNo">1086</span> IOException ie = new
ConnectionClosingException("Connection to " + getRemoteAddress()<a
name="line.1086"></a>
+<span class="sourceLineNo">1087</span> + " is closing. Call id="
+ c.id + ", waitTime=" + waitTime);<a name="line.1087"></a>
+<span class="sourceLineNo">1088</span> c.setException(ie);<a
name="line.1088"></a>
<span class="sourceLineNo">1089</span> itor.remove();<a
name="line.1089"></a>
-<span class="sourceLineNo">1090</span> } else {<a name="line.1090"></a>
-<span class="sourceLineNo">1091</span> // We expect the call to be
ordered by timeout. It may not be the case, but stopping<a name="line.1091"></a>
-<span class="sourceLineNo">1092</span> // at the first valid call
allows to be sure that we still have something to do without<a
name="line.1092"></a>
-<span class="sourceLineNo">1093</span> // spending too much time by
reading the full list.<a name="line.1093"></a>
-<span class="sourceLineNo">1094</span> break;<a name="line.1094"></a>
-<span class="sourceLineNo">1095</span> }<a name="line.1095"></a>
-<span class="sourceLineNo">1096</span> }<a name="line.1096"></a>
-<span class="sourceLineNo">1097</span> }<a name="line.1097"></a>
-<span class="sourceLineNo">1098</span> }<a name="line.1098"></a>
-<span class="sourceLineNo">1099</span><a name="line.1099"></a>
-<span class="sourceLineNo">1100</span> /**<a name="line.1100"></a>
-<span class="sourceLineNo">1101</span> * Used in test only. Construct an IPC
cluster client whose values are of the<a name="line.1101"></a>
-<span class="sourceLineNo">1102</span> * {@link Message} class.<a
name="line.1102"></a>
-<span class="sourceLineNo">1103</span> * @param conf configuration<a
name="line.1103"></a>
-<span class="sourceLineNo">1104</span> * @param clusterId the cluster id<a
name="line.1104"></a>
-<span class="sourceLineNo">1105</span> * @param factory socket factory<a
name="line.1105"></a>
-<span class="sourceLineNo">1106</span> */<a name="line.1106"></a>
-<span class="sourceLineNo">1107</span> @VisibleForTesting<a
name="line.1107"></a>
-<span class="sourceLineNo">1108</span> RpcClientImpl(Configuration conf,
String clusterId, SocketFactory factory) {<a name="line.1108"></a>
-<span class="sourceLineNo">1109</span> this(conf, clusterId, factory, null,
null);<a name="line.1109"></a>
-<span class="sourceLineNo">1110</span> }<a name="line.1110"></a>
-<span class="sourceLineNo">1111</span><a name="line.1111"></a>
-<span class="sourceLineNo">1112</span> /**<a name="line.1112"></a>
-<span class="sourceLineNo">1113</span> * Construct an IPC cluster client
whose values are of the {@link Message} class.<a name="line.1113"></a>
-<span class="sourceLineNo">1114</span> * @param conf configuration<a
name="line.1114"></a>
-<span class="sourceLineNo">1115</span> * @param clusterId the cluster id<a
name="line.1115"></a>
-<span class="sourceLineNo">1116</span> * @param factory socket factory<a
name="line.1116"></a>
-<span class="sourceLineNo">1117</span> * @param localAddr client socket bind
address<a name="line.1117"></a>
-<span class="sourceLineNo">1118</span> * @param metrics the connection
metrics<a name="line.1118"></a>
-<span class="sourceLineNo">1119</span> */<a name="line.1119"></a>
-<span class="sourceLineNo">1120</span> RpcClientImpl(Configuration conf,
String clusterId, SocketFactory factory,<a name="line.1120"></a>
-<span class="sourceLineNo">1121</span> SocketAddress localAddr,
MetricsConnection metrics) {<a name="line.1121"></a>
-<span class="sourceLineNo">1122</span> super(conf, clusterId, localAddr,
metrics);<a name="line.1122"></a>
-<span class="sourceLineNo">1123</span><a name="line.1123"></a>
-<span class="sourceLineNo">1124</span> this.socketFactory = factory;<a
name="line.1124"></a>
-<span class="sourceLineNo">1125</span> this.connections = new
PoolMap<ConnectionId, Connection>(getPoolType(conf),
getPoolSize(conf));<a name="line.1125"></a>
-<span class="sourceLineNo">1126</span> this.failedServers = new
FailedServers(conf);<a name="line.1126"></a>
-<span class="sourceLineNo">1127</span> }<a name="line.1127"></a>
-<span class="sourceLineNo">1128</span><a name="line.1128"></a>
-<span class="sourceLineNo">1129</span> /**<a name="line.1129"></a>
-<span class="sourceLineNo">1130</span> * Used in test only. Construct an IPC
client for the cluster {@code clusterId} with<a name="line.1130"></a>
-<span class="sourceLineNo">1131</span> * the default SocketFactory<a
name="line.1131"></a>
-<span class="sourceLineNo">1132</span> */<a name="line.1132"></a>
-<span class="sourceLineNo">1133</span> @VisibleForTesting<a
name="line.1133"></a>
-<span class="sourceLineNo">1134</span> RpcClientImpl(Configuration conf,
String clusterId) {<a name="line.1134"></a>
-<span class="sourceLineNo">1135</span> this(conf, clusterId,
NetUtils.getDefaultSocketFactory(conf), null, null);<a name="line.1135"></a>
-<span class="sourceLineNo">1136</span> }<a name="line.1136"></a>
-<span class="sourceLineNo">1137</span><a name="line.1137"></a>
-<span class="sourceLineNo">1138</span> /**<a name="line.1138"></a>
-<span class="sourceLineNo">1139</span> * Construct an IPC client for the
cluster {@code clusterId} with the default SocketFactory<a name="line.1139"></a>
-<span class="sourceLineNo">1140</span> *<a name="line.1140"></a>
-<span class="sourceLineNo">1141</span> * This method is called with
reflection by the RpcClientFactory to create an instance<a name="line.1141"></a>
+<span class="sourceLineNo">1090</span> } else if
(c.checkAndSetTimeout()) {<a name="line.1090"></a>
+<span class="sourceLineNo">1091</span> itor.remove();<a
name="line.1091"></a>
+<span class="sourceLineNo">1092</span> } else {<a name="line.1092"></a>
+<span class="sourceLineNo">1093</span> // We expect the call to be
ordered by timeout. It may not be the case, but stopping<a name="line.1093"></a>
+<span class="sourceLineNo">1094</span> // at the first valid call
allows to be sure that we still have something to do without<a
name="line.1094"></a>
+<span class="sourceLineNo">1095</span> // spending too much time by
reading the full list.<a name="line.1095"></a>
+<span class="sourceLineNo">1096</span> break;<a name="line.1096"></a>
+<span class="sourceLineNo">1097</span> }<a name="line.1097"></a>
+<span class="sourceLineNo">1098</span> }<a name="line.1098"></a>
+<span class="sourceLineNo">1099</span> }<a name="line.1099"></a>
+<span class="sourceLineNo">1100</span> }<a name="line.1100"></a>
+<span class="sourceLineNo">1101</span><a name="line.1101"></a>
+<span class="sourceLineNo">1102</span> /**<a name="line.1102"></a>
+<span class="sourceLineNo">1103</span> * Used in test only. Construct an IPC
cluster client whose values are of the<a name="line.1103"></a>
+<span class="sourceLineNo">1104</span> * {@link Message} class.<a
name="line.1104"></a>
+<span class="sourceLineNo">1105</span> * @param conf configuration<a
name="line.1105"></a>
+<span class="sourceLineNo">1106</span> * @param clusterId the cluster id<a
name="line.1106"></a>
+<span class="sourceLineNo">1107</span> * @param factory socket factory<a
name="line.1107"></a>
+<span class="sourceLineNo">1108</span> */<a name="line.1108"></a>
+<span class="sourceLineNo">1109</span> @VisibleForTesting<a
name="line.1109"></a>
+<span class="sourceLineNo">1110</span> RpcClientImpl(Configuration conf,
String clusterId, SocketFactory factory) {<a name="line.1110"></a>
+<span class="sourceLineNo">1111</span> this(conf, clusterId, factory, null,
null);<a name="line.1111"></a>
+<span class="sourceLineNo">1112</span> }<a name="line.1112"></a>
+<span class="sourceLineNo">1113</span><a name="line.1113"></a>
+<span class="sourceLineNo">1114</span> /**<a name="line.1114"></a>
+<span class="sourceLineNo">1115</span> * Construct an IPC cluster client
whose values are of the {@link Message} class.<a name="line.1115"></a>
+<span class="sourceLineNo">1116</span> * @param conf configuration<a
name="line.1116"></a>
+<span class="sourceLineNo">1117</span> * @param clusterId the cluster id<a
name="line.1117"></a>
+<span class="sourceLineNo">1118</span> * @param factory socket factory<a
name="line.1118"></a>
+<span class="sourceLineNo">1119</span> * @param localAddr client socket bind
address<a name="line.1119"></a>
+<span class="sourceLineNo">1120</span> * @param metrics the connection
metrics<a name="line.1120"></a>
+<span class="sourceLineNo">1121</span> */<a name="line.1121"></a>
+<span class="sourceLineNo">1122</span> RpcClientImpl(Configuration conf,
String clusterId, SocketFactory factory,<a name="line.1122"></a>
+<span class="sourceLineNo">1123</span> SocketAddress localAddr,
MetricsConnection metrics) {<a name="line.1123"></a>
+<span class="sourceLineNo">1124</span> super(conf, clusterId, localAddr,
metrics);<a name="line.1124"></a>
+<span class="sourceLineNo">1125</span><a name="line.1125"></a>
+<span class="sourceLineNo">1126</span> this.socketFactory = factory;<a
name="line.1126"></a>
+<span class="sourceLineNo">1127</span> this.connections = new
PoolMap<ConnectionId, Connection>(getPoolType(conf),
getPoolSize(conf));<a name="line.1127"></a>
+<span class="sourceLineNo">1128</span> this.failedServers = new
FailedServers(conf);<a name="line.1128"></a>
+<span class="sourceLineNo">1129</span> }<a name="line.1129"></a>
+<span class="sourceLineNo">1130</span><a name="line.1130"></a>
+<span class="sourceLineNo">1131</span> /**<a name="line.1131"></a>
+<span class="sourceLineNo">1132</span> * Used in test only. Construct an IPC
client for the cluster {@code clusterId} with<a name="line.1132"></a>
+<span class="sourceLineNo">1133</span> * the default SocketFactory<a
name="line.1133"></a>
+<span class="sourceLineNo">1134</span> */<a name="line.1134"></a>
+<span class="sourceLineNo">1135</span> @VisibleForTesting<a
name="line.1135"></a>
+<span class="sourceLineNo">1136</span> RpcClientImpl(Configuration conf,
String clusterId) {<a name="line.1136"></a>
+<span class="sourceLineNo">1137</span> this(conf, clusterId,
NetUtils.getDefaultSocketFactory(conf), null, null);<a name="line.1137"></a>
+<span class="sourceLineNo">1138</span> }<a name="line.1138"></a>
+<span class="sourceLineNo">1139</span><a name="line.1139"></a>
+<span class="sourceLineNo">1140</span> /**<a name="line.1140"></a>
+<span class="sourceLineNo">1141</span> * Construct an IPC client for the
cluster {@code clusterId} with the default SocketFactory<a name="line.1141"></a>
<span class="sourceLineNo">1142</span> *<a name="line.1142"></a>
-<span class="sourceLineNo">1143</span> * @param conf configuration<a
name="line.1143"></a>
-<span class="sourceLineNo">1144</span> * @param clusterId the cluster id<a
name="line.1144"></a>
-<span class="sourceLineNo">1145</span> * @param localAddr client socket bind
address.<a name="line.1145"></a>
-<span class="sourceLineNo">1146</span> * @param metrics the connection
metrics<a name="line.1146"></a>
-<span class="sourceLineNo">1147</span> */<a name="line.1147"></a>
-<span class="sourceLineNo">1148</span> public RpcClientImpl(Configuration
conf, String clusterId, SocketAddress localAddr,<a name="line.1148"></a>
-<span class="sourceLineNo">1149</span> MetricsConnection metrics) {<a
name="line.1149"></a>
-<span class="sourceLineNo">1150</span> this(conf, clusterId,
NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);<a
name="line.1150"></a>
-<span class="sourceLineNo">1151</span> }<a name="line.1151"></a>
-<span class="sourceLineNo">1152</span><a name="line.1152"></a>
-<span class="sourceLineNo">1153</span> /** Stop all threads related to this
client. No further calls may be made<a name="line.1153"></a>
-<span class="sourceLineNo">1154</span> * using this client. */<a
name="line.1154"></a>
-<span class="sourceLineNo">1155</span> @Override<a name="line.1155"></a>
-<span class="sourceLineNo">1156</span> public void close() {<a
name="line.1156"></a>
-<span class="sourceLineNo">1157</span> if (LOG.isDebugEnabled())
LOG.debug("Stopping rpc client");<a name="line.1157"></a>
-<span class="sourceLineNo">1158</span> if (!running.compareAndSet(true,
false)) return;<a name="line.1158"></a>
-<span class="sourceLineNo">1159</span><a name="line.1159"></a>
-<span class="sourceLineNo">1160</span> Set<Connection> connsToClose =
null;<a name="line.1160"></a>
-<span class="sourceLineNo">1161</span> // wake up all connections<a
name="line.1161"></a>
-<span class="sourceLineNo">1162</span> synchronized (connections) {<a
name="line.1162"></a>
-<span class="sourceLineNo">1163</span> for (Connection conn :
connections.values()) {<a name="line.1163"></a>
-<span class="sourceLineNo">1164</span> conn.interrupt();<a
name="line.1164"></a>
-<span class="sourceLineNo">1165</span> if (conn.callSender != null) {<a
name="line.1165"></a>
-<span class="sourceLineNo">1166</span> conn.callSender.interrupt();<a
name="line.1166"></a>
-<span class="sourceLineNo">1167</span> }<a name="line.1167"></a>
-<span class="sourceLineNo">1168</span><a name="line.1168"></a>
-<span class="sourceLineNo">1169</span> // In case the CallSender did
not setupIOStreams() yet, the Connection may not be started<a
name="line.1169"></a>
-<span class="sourceLineNo">1170</span> // at all (if CallSender has a
cancelled Call it can happen). See HBASE-13851<a name="line.1170"></a>
-<span class="sourceLineNo">1171</span> if (!conn.isAlive()) {<a
name="line.1171"></a>
-<span class="sourceLineNo">1172</span> if (connsToClose == null) {<a
name="line.1172"></a>
-<span class="sourceLineNo">1173</span> connsToClose = new
HashSet<Connection>();<a name="line.1173"></a>
-<span class="sourceLineNo">1174</span> }<a name="line.1174"></a>
-<span class="sourceLineNo">1175</span> connsToClose.add(conn);<a
name="line.1175"></a>
-<span class="sourceLineNo">1176</span> }<a name="line.1176"></a>
-<span class="sourceLineNo">1177</span> }<a name="line.1177"></a>
-<span class="sourceLineNo">1178</span> }<a name="line.1178"></a>
-<span class="sourceLineNo">1179</span> if (connsToClose != null) {<a
name="line.1179"></a>
-<span class="sourceLineNo">1180</span> for (Connection conn :
connsToClose) {<a name="line.1180"></a>
-<span class="sourceLineNo">1181</span> if (conn.markClosed(new
InterruptedIOException("RpcClient is closing"))) {<a name="line.1181"></a>
-<span class="sourceLineNo">1182</span> conn.close();<a
name="line.1182"></a>
-<span class="sourceLineNo">1183</span> }<a name="line.1183"></a>
-<span class="sourceLineNo">1184</span> }<a name="line.1184"></a>
-<span class="sourceLineNo">1185</span> }<a name="line.1185"></a>
-<span class="sourceLineNo">1186</span> // wait until all connections are
closed<a name="line.1186"></a>
-<span class="sourceLineNo">1187</span> while (!connections.isEmpty()) {<a
name="line.1187"></a>
-<span class="sourceLineNo">1188</span> try {<a name="line.1188"></a>
-<span class="sourceLineNo">1189</span> Thread.sleep(10);<a
name="line.1189"></a>
-<span class="sourceLineNo">1190</span> } catch (InterruptedException e)
{<a name="line.1190"></a>
-<span class="sourceLineNo">1191</span> LOG.info("Interrupted while
stopping the client. We still have " + connections.size() +<a
name="line.1191"></a>
-<span class="sourceLineNo">1192</span> " connections.");<a
name="line.1192"></a>
-<span class="sourceLineNo">1193</span>
Thread.currentThread().interrupt();<a name="line.1193"></a>
-<span class="sourceLineNo">1194</span> return;<a name="line.1194"></a>
-<span class="sourceLineNo">1195</span> }<a name="line.1195"></a>
-<span class="sourceLineNo">1196</span> }<a name="line.1196"></a>
-<span class="sourceLineNo">1197</span> }<a name="line.1197"></a>
-<span class="sourceLineNo">1198</span><a name="line.1198"></a>
-<span class="sourceLineNo">1199</span> /** Make a call, passing
<code>param</code>, to the IPC server running at<a
name="line.1199"></a>
-<span class="sourceLineNo">1200</span> * <code>address</code>
which is servicing the <code>protocol</code> protocol,<a
name="line.1200"></a>
-<span class="sourceLineNo">1201</span> * with the
<code>ticket</code> credentials, returning the value.<a
name="line.1201"></a>
-<span class="sourceLineNo">1202</span> * Throws exceptions if there are
network problems or if the remote code<a name="line.1202"></a>
-<span class="sourceLineNo">1203</span> * threw an exception.<a
name="line.1203"></a>
-<span class="sourceLineNo">1204</span> * @param ticket Be careful which
ticket you pass. A new user will mean a new Connection.<a name="line.1204"></a>
-<span class="sourceLineNo">1205</span> * {@link
UserProvider#getCurrent()} makes a new instance of User each time so will be
a<a name="line.1205"></a>
-<span class="sourceLineNo">1206</span> * new Connection each
time.<a name="line.1206"></a>
-<span class="sourceLineNo">1207</span> * @return A pair with the Message
response and the Cell data (if any).<a name="line.1207"></a>
-<span class="sourceLineNo">1208</span> * @throws InterruptedException<a
name="line.1208"></a>
-<span class="sourceLineNo">1209</span> * @throws IOException<a
name="line.1209"></a>
-<span class="sourceLineNo">1210</span> */<a name="line.1210"></a>
-<span class="sourceLineNo">1211</span> @Override<a name="line.1211"></a>
-<span class="sourceLineNo">1212</span> protected Pair<Message,
CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,<a
name="line.1212"></a>
-<span class="sourceLineNo">1213</span> Message param, Message returnType,
User ticket, InetSocketAddress addr,<a name="line.1213"></a>
-<span class="sourceLineNo">1214</span> MetricsConnection.CallStats
callStats)<a name="line.1214"></a>
-<span class="sourceLineNo">1215</span> throws IOException,
InterruptedException {<a name="line.1215"></a>
-<span class="sourceLineNo">1216</span> if (pcrc == null) {<a
name="line.1216"></a>
-<span class="sourceLineNo">1217</span> pcrc = new
PayloadCarryingRpcController();<a name="line.1217"></a>
-<span class="sourceLineNo">1218</span> }<a name="line.1218"></a>
-<span class="sourceLineNo">1219</span> CellScanner cells =
pcrc.cellScanner();<a name="line.1219"></a>
-<span class="sourceLineNo">1220</span><a name="line.1220"></a>
-<span class="sourceLineNo">1221</span> final Call call = new
Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,<a
name="line.1221"></a>
-<span class="sourceLineNo">1222</span> pcrc.getCallTimeout(),
MetricsConnection.newCallStats());<a name="line.1222"></a>
-<span class="sourceLineNo">1223</span><a name="line.1223"></a>
-<span class="sourceLineNo">1224</span> final Connection connection =
getConnection(ticket, call, addr);<a name="line.1224"></a>
+<span class="sourceLineNo">1143</span> * This method is called with
reflection by the RpcClientFactory to create an instance<a name="line.1143"></a>
+<span class="sourceLineNo">1144</span> *<a name="line.1144"></a>
+<span class="sourceLineNo">1145</span> * @param conf configuration<a
name="line.1145"></a>
+<span class="sourceLineNo">1146</span> * @param clusterId the cluster id<a
name="line.1146"></a>
+<span class="sourceLineNo">1147</span> * @param localAddr client socket bind
address.<a name="line.1147"></a>
+<span class="sourceLineNo">1148</span> * @param metrics the connection
metrics<a name="line.1148"></a>
+<span class="sourceLineNo">1149</span> */<a name="line.1149"></a>
+<span class="sourceLineNo">1150</span> public RpcClientImpl(Configuration
conf, String clusterId, SocketAddress localAddr,<a name="line.1150"></a>
+<span class="sourceLineNo">1151</span> MetricsConnection metrics) {<a
name="line.1151"></a>
+<span class="sourceLineNo">1152</span> this(conf, clusterId,
NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);<a
name="line.1152"></a>
+<span class="sourceLineNo">1153</span> }<a name="line.1153"></a>
+<span class="sourceLineNo">1154</span><a name="line.1154"></a>
+<span class="sourceLineNo">1155</span> /** Stop all threads related to this
client. No further calls may be made<a name="line.1155"></a>
+<span class="sourceLineNo">1156</span> * using this client. */<a
name="line.1156"></a>
+<span class="sourceLineNo">1157</span> @Override<a name="line.1157"></a>
+<span class="sourceLineNo">1158</span> public void close() {<a
name="line.1158"></a>
+<span class="sourceLineNo">1159</span> if (LOG.isDebugEnabled())
LOG.debug("Stopping rpc client");<a name="line.1159"></a>
+<span class="sourceLineNo">1160</span> if (!running.compareAndSet(true,
false)) return;<a name="line.1160"></a>
+<span class="sourceLineNo">1161</span><a name="line.1161"></a>
+<span class="sourceLineNo">1162</span> Set<Connection> connsToClose =
null;<a name="line.1162"></a>
+<span class="sourceLineNo">1163</span> // wake up all connections<a
name="line.1163"></a>
+<span class="sourceLineNo">1164</span> synchronized (connections) {<a
name="line.1164"></a>
+<span class="sourceLineNo">1165</span> for (Connection conn :
connections.values()) {<a name="line.1165"></a>
+<span class="sourceLineNo">1166</span> conn.interrupt();<a
name="line.1166"></a>
+<span class="sourceLineNo">1167</span> if (conn.callSender != null) {<a
name="line.1167"></a>
+<span class="sourceLineNo">1168</span> conn.callSender.interrupt();<a
name="line.1168"></a>
+<span class="sourceLineNo">1169</span> }<a name="line.1169"></a>
+<span class="sourceLineNo">1170</span><a name="line.1170"></a>
+<span class="sourceLineNo">1171</span> // In case the CallSender did
not setupIOStreams() yet, the Connection may not be started<a
name="line.1171"></a>
+<span class="sourceLineNo">1172</span> // at all (if CallSender has a
cancelled Call it can happen). See HBASE-13851<a name="line.1172"></a>
+<span class="sourceLineNo">1173</span> if (!conn.isAlive()) {<a
name="line.1173"></a>
+<span class="sourceLineNo">1174</span> if (connsToClose == null) {<a
name="line.1174"></a>
+<span class="sourceLineNo">1175</span> connsToClose = new
HashSet<Connection>();<a name="line.1175"></a>
+<span class="sourceLineNo">1176</span> }<a name="line.1176"></a>
+<span class="sourceLineNo">1177</span> connsToClose.add(conn);<a
name="line.1177"></a>
+<span class="sourceLineNo">1178</span> }<a name="line.1178"></a>
+<span class="sourceLineNo">1179</span> }<a name="line.1179"></a>
+<span class="sourceLineNo">1180</span> }<a name="line.1180"></a>
+<span class="sourceLineNo">1181</span> if (connsToClose != null) {<a
name="line.1181"></a>
+<span class="sourceLineNo">1182</span> for (Connection conn :
connsToClose) {<a name="line.1182"></a>
+<span class="sourceLineNo">1183</span> if (conn.markClosed(new
InterruptedIOException("RpcClient is closing"))) {<a name="line.1183"></a>
+<span class="sourceLineNo">1184</span> conn.close();<a
name="line.1184"></a>
+<span class="sourceLineNo">1185</span> }<a name="line.1185"></a>
+<span class="sourceLineNo">1186</span> }<a name="line.1186"></a>
+<span class="sourceLineNo">1187</span> }<a name="line.1187"></a>
+<span class="sourceLineNo">1188</span> // wait until all connections are
closed<a name="line.1188"></a>
+<span class="sourceLineNo">1189</span> while (!connections.isEmpty()) {<a
name="line.1189"></a>
+<span class="sourceLineNo">1190</span> try {<a name="line.1190"></a>
+<span class="sourceLineNo">1191</span> Thread.sleep(10);<a
name="line.1191"></a>
+<span class="sourceLineNo">1192</span> } catch (InterruptedException e)
{<a name="line.1192"></a>
+<span class="sourceLineNo">1193</span> LOG.info("Interrupted while
stopping the client. We still have " + connections.size() +<a
name="line.1193"></a>
+<span class="sourceLineNo">1194</span> " connections.");<a
name="line.1194"></a>
+<span class="sourceLineNo">1195</span>
Thread.currentThread().interrupt();<a name="line.1195"></a>
+<span class="sourceLineNo">1196</span> return;<a name="line.1196"></a>
+<span class="sourceLineNo">1197</span> }<a name="line.1197"></a>
+<span class="sourceLineNo">1198</span> }<a name="line.1198"></a>
+<span class="sourceLineNo">1199</span> }<a name="line.1199"></a>
+<span class="sourceLineNo">1200</span><a name="line.1200"></a>
+<span class="sourceLineNo">1201</span> /** Make a call, passing
<code>param</code>, to the IPC server running at<a
name="line.1201"></a>
+<span class="sourceLineNo">1202</span> * <code>address</code>
which is servicing the <code>protocol</code> protocol,<a
name="line.1202"></a>
+<span class="sourceLineNo">1203</span> * with the
<code>ticket</code> credentials, returning the value.<a
name="line.1203"></a>
+<span class="sourceLineNo">1204</span> * Throws exceptions if there are
network problems or if the remote code<a name="line.1204"></a>
+<span class="sourceLineNo">1205</span> * threw an exception.<a
name="line.1205"></a>
+<span class="sourceLineNo">1206</span> * @param ticket Be careful which
ticket you pass. A new user will mean a new Connection.<a name="line.1206"></a>
+<span class="sourceLineNo">1207</span> * {@link
UserProvider#getCurrent()} makes a new instance of User each time so will be
a<a name="line.1207"></a>
+<span class="sourceLineNo">1208</span> * new Connection each
time.<a name="line.1208"></a>
+<span class="sourceLineNo">1209</span> * @return A pair with the Message
response and the Cell data (if any).<a name="line.1209"></a>
+<span class="sourceLineNo">1210</span> * @throws InterruptedException<a
name="line.1210"></a>
+<span class="sourceLineNo">1211</span> * @throws IOException<a
name="line.1211"></a>
+<span class="sourceLineNo">1212</span> */<a name="line.1212"></a>
+<span class="sourceLineNo">1213</span> @Override<a name="line.1213"></a>
+<span class="sourceLineNo">1214</span> protected Pair<Message,
CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,<a
name="line.1214"></a>
+<span class="sourceLineNo">1215</span> Message param, Message returnType,
User ticket, InetSocketAddress addr,<a name="line.1215"></a>
+<span class="sourceLineNo">1216</span> MetricsConnection.CallStats
callStats)<a name="line.1216"></a>
+<span class="sourceLineNo">1217</span> throws IOException,
InterruptedException {<a name="line.1217"></a>
+<span class="sourceLineNo">1218</span> if (pcrc == null) {<a
name="line.1218"></a>
+<span class="sourceLineNo">1219</span> pcrc = new
PayloadCarryingRpcController();<a name="line.1219"></a>
+<span class="sourceLineNo">1220</span> }<a name="line.1220"></a>
+<span class="sourceLineNo">1221</span> CellScanner cells =
pcrc.cellScanner();<a name="line.1221"></a>
+<span class="sourceLineNo">1222</span><a name="line.1222"></a>
+<span class="sourceLineNo">1223</span> final Call call = new
Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,<a
name="line.1223"></a>
+<span class="sourceLineNo">1224</span> pcrc.getCallTimeout(),
MetricsConnection.newCallStats());<a name="line.1224"></a>
<span class="sourceLineNo">1225</span><a name="line.1225"></a>
-<span class="sourceLineNo">1226</span> final CallFuture cts;<a
name="line.1226"></a>
-<span class="sourceLineNo">1227</span> if (connection.callSender != null)
{<a name="line.1227"></a>
-<span class="sourceLineNo">1228</span> cts =
connection.callSender.sendCall(call, pcrc.getPriority(),
Trace.currentSpan());<a name="line.1228"></a>
-<span class="sourceLineNo">1229</span> pcrc.notifyOnCancel(new
RpcCallback<Object>() {<a name="line.1229"></a>
-<span class="sourceLineNo">1230</span> @Override<a
name="line.1230"></a>
-<span class="sourceLineNo">1231</span> public void run(Object
parameter) {<a name="line.1231"></a>
-<span class="sourceLineNo">1232</span>
connection.callSender.remove(cts);<a name="line.1232"></a>
-<span class="sourceLineNo">1233</span> }<a name="line.1233"></a>
-<span class="sourceLineNo">1234</span> });<a name="line.1234"></a>
-<span class="sourceLineNo">1235</span> if (pcrc.isCanceled()) {<a
name="line.1235"></a>
-<span class="sourceLineNo">1236</span> // To finish if the call was
cancelled before we set the notification (race condition)<a
name="line.1236"></a>
-<span class="sourceLineNo">1237</span> call.callComplete();<a
name="line.1237"></a>
-<span class="sourceLineNo">1238</span> return new Pair<Message,
CellScanner>(call.response, call.cells);<a name="line.1238"></a>
-<span class="sourceLineNo">1239</span> }<a name="line.1239"></a>
-<span class="sourceLineNo">1240</span> } else {<a name="line.1240"></a>
-<span class="sourceLineNo">1241</span> cts = null;<a name="line.1241"></a>
-<span class="sourceLineNo">1242</span>
connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());<a
name="line.1242"></a>
-<span class="sourceLineNo">1243</span> }<a name="line.1243"></a>
-<span class="sourceLineNo">1244</span><a name="line.1244"></a>
-<span class="sourceLineNo">1245</span> while (!call.done) {<a
name="line.1245"></a>
-<span class="sourceLineNo">1246</span> if (call.checkAndSetTimeout()) {<a
name="line.1246"></a>
-<span class="sourceLineNo">1247</span> if (cts != null)
connection.callSender.remove(cts);<a name="line.1247"></a>
-<span class="sourceLineNo">1248</span> break;<a name="line.1248"></a>
-<span class="sourceLineNo">1249</span> }<a name="line.1249"></a>
-<span class="sourceLineNo">1250</span> if
(connection.shouldCloseConnection.get()) {<a name="line.1250"></a>
-<span class="sourceLineNo">1251</span> throw new
ConnectionClosingException("Call id=" + call.id +<a name="line.1251"></a>
-<span class="sourceLineNo">1252</span> " on server " + addr + "
aborted: connection is closing");<a name="line.1252"></a>
-<span class="sourceLineNo">1253</span> }<a name="line.1253"></a>
-<span class="sourceLineNo">1254</span> try {<a name="line.1254"></a>
-<span class="sourceLineNo">1255</span> synchronized (call) {<a
name="line.1255"></a>
-<span class="sourceLineNo">1256</span> if (call.done) break;<a
name="line.1256"></a>
-<span class="sourceLineNo">1257</span>
call.wait(Math.min(call.remainingTime(), 1000) + 1);<a name="line.1257"></a>
-<span class="sourceLineNo">1258</span> }<a name="line.1258"></a>
-<span class="sourceLineNo">1259</span> } catch (InterruptedException e)
{<a name="line.1259"></a>
-<span class="sourceLineNo">1260</span> call.setException(new
InterruptedIOException());<a name="line.1260"></a>
-<span class="sourceLineNo">1261</span> if (cts != null)
connection.callSender.remove(cts);<a name="line.1261"></a>
-<span class="sourceLineNo">1262</span> throw e;<a name="line.1262"></a>
-<span class="sourceLineNo">1263</span> }<a name="line.1263"></a>
-<span class="sourceLineNo">1264</span> }<a name="line.1264"></a>
-<span class="sourceLineNo">1265</span><a name="line.1265"></a>
-<span class="sourceLineNo">1266</span> if (call.error != null) {<a
name="line.1266"></a>
-<span class="sourceLineNo">1267</span> if (call.error instanceof
RemoteException) {<a name="line.1267"></a>
-<span class="sourceLineNo">1268</span> call.error.fillInStackTrace();<a
name="line.1268"></a>
-<span class="sourceLineNo">1269</span> throw call.error;<a
name="line.1269"></a>
-<span class="sourceLineNo">1270</span> }<a name="line.1270"></a>
-<span class="sourceLineNo">1271</span> // local exception<a
name="line.1271"></a>
-<span class="sourceLineNo">1272</span> throw wrapException(addr,
call.error);<a name="line.1272"></a>
-<span class="sourceLineNo">1273</span> }<a name="line.1273"></a>
-<span class="sourceLineNo">1274</span><a name="line.1274"></a>
-<span class="sourceLineNo">1275</span> return new Pair<Message,
CellScanner>(call.response, call.cells);<a name="line.1275"></a>
-<span class="sourceLineNo">1276</span> }<a name="line.1276"></a>
-<span class="sourceLineNo">1277</span><a name="line.1277"></a>
-<span class="sourceLineNo">1278</span><a name="line.1278"></a>
-<span class="sourceLineNo">1279</span> /**<a name="line.1279"></a>
-<span class="sourceLineNo">1280</span> * Interrupt the connections to the
given ip:port server. This should be called if the server<a
name="line.1280"></a>
-<span class="sourceLineNo">1281</span> * is known as actually dead. This
will not prevent current operation to be retried, and,<a name="line.1281"></a>
-<span class="sourceLineNo">1282</span> * depending on their own behavior,
they may retry on the same server. This can be a feature,<a
name="line.1282"></a>
-<span class="sourceLineNo">1283</span> * for example at startup. In any
case, they're likely to get connection refused (if the<a name="line.1283"></a>
-<span class="sourceLineNo">1284</span> * process died) or no route to host:
i.e. their next retries should be faster and with a<a name="line.1284"></a>
-<span class="sourceLineNo">1285</span> * safe exception.<a
name="line.1285"></a>
-<span class="sourceLineNo">1286</span> */<a name="line.1286"></a>
-<span class="sourceLineNo">1287</span> @Override<a name="line.1287"></a>
-<span class="sourceLineNo">1288</span> public void
cancelConnections(ServerName sn) {<a name="line.1288"></a>
-<span class="sourceLineNo">1289</span> synchronized (connections) {<a
name="line.1289"></a>
-<span class="sourceLineNo">1290</span> for (Connection connection :
connections.values()) {<a name="line.1290"></a>
-<span class="sourceLineNo">1291</span> if (connection.isAlive()
&&<a name="line.1291"></a>
-<span class="sourceLineNo">1292</span>
connection.getRemoteAddress().getPort() == sn.getPort() &&<a
name="line.1292"></a>
-<span class="sourceLineNo">1293</span>
connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {<a
name="line.1293"></a>
-<span class="sourceLineNo">1294</span> LOG.info("The server on " +
sn.toString() +<a name="line.1294"></a>
-<span class="sourceLineNo">1295</span> " is dead - stopping the
connection " + connection.remoteId);<a name="line.1295"></a>
-<span class="sourceLineNo">1296</span> connection.interrupt(); //
We're interrupting a Reader. It means we want it to finish.<a
name="line.1296"></a>
-<span class="sourceLineNo">1297</span> //
This will close the connection as well.<a name="line.1297"></a>
-<span class="sourceLineNo">1298</span> }<a name="line.1298"></a>
-<span class="sourceLineNo">1299</span> }<a name="line.1299"></a>
-<span class="sourceLineNo">1300</span> }<a name="line.1300"></a>
-<span class="sourceLineNo">1301</span> }<a name="line.1301"></a>
-<span class="sourceLineNo">1302</span><a name="line.1302"></a>
-<span class="sourceLineNo">1303</span> /**<a name="line.1303"></a>
-<span class="sourceLineNo">1304</span> * Get a connection from the pool, or
create a new one and add it to the<a name="line.1304"></a>
-<span class="sourceLineNo">1305</span> * pool. Connections to a given
host/port are reused.<a name="line.1305"></a>
-<span class="sourceLineNo">1306</span> */<a name="line.1306"></a>
-<span class="sourceLineNo">1307</span> protected Connection
getConnection(User ticket, Call call, InetSocketAddress addr)<a
name="line.1307"></a>
-<span class="sourceLineNo">1308</span> throws IOException {<a
name="line.1308"></a>
-<span class="sourceLineNo">1309</span> if (!running.get()) throw new
StoppedRpcClientException();<a name="line.1309"></a>
-<span class="sourceLineNo">1310</span> Connection connection;<a
name="line.1310"></a>
-<span class="sourceLineNo">1311</span> ConnectionId remoteId =<a
name="line.1311"></a>
-<span class="sourceLineNo">1312</span> new ConnectionId(ticket,
call.md.getService().getName(), addr);<a name="line.1312"></a>
-<span class="sourceLineNo">1313</span> synchronized (connections) {<a
name="line.1313"></a>
-<span class="sourceLineNo">1314</span> connection =
connections.get(remoteId);<a name="line.1314"></a>
-<span class="sourceLineNo">1315</span> if (connection == null) {<a
name="line.1315"></a>
-<span class="sourceLineNo">1316</span> connection =
createConnection(remoteId, this.codec, this.compressor);<a name="line.1316"></a>
-<span class="sourceLineNo">1317</span> connections.put(remoteId,
connection);<a name="line.1317"></a>
-<span class="sourceLineNo">1318</span> }<a name="line.1318"></a>
-<span class="sourceLineNo">1319</span> }<a name="line.1319"></a>
-<span class="sourceLineNo">1320</span><a name="line.1320"></a>
-<span class="sourceLineNo">1321</span> return connection;<a
name="line.1321"></a>
-<span class="sourceLineNo">1322</span> }<a name="line.1322"></a>
-<span class="sourceLineNo">1323</span>}<a name="line.1323"></a>
+<span class="sourceLineNo">1226</span> final Connection connection =
getConnection(ticket, call, addr);<a name="line.1226"></a>
+<span class="sourceLineNo">1227</span><a name="line.1227"></a>
+<span class="sourceLineNo">1228</span> final CallFuture cts;<a
name="line.1228"></a>
+<span class="sourceLineNo">1229</span> if (connection.callSender != null)
{<a name="line.1229"></a>
+<span class="sourceLineNo">1230</span> cts =
connection.callSender.sendCall(call, pcrc.getPriority(),
Trace.currentSpan());<a name="line.1230"></a>
+<span class="sourceLineNo">1231</span> pcrc.notifyOnCancel(new
RpcCallback<Object>() {<a name="line.1231"></a>
+<span class="sourceLineNo">1232</span> @Override<a
name="line.1232"></a>
+<span class="sourceLineNo">1233</span> public void run(Object
parameter) {<a name="line.1233"></a>
+<span class="sourceLineNo">1234</span>
connection.callSender.remove(cts);<a name="line.1234"></a>
+<span class="sourceLineNo">1235</span> }<a name="line.1235"></a>
+<span class="sourceLineNo">1236</span> });<a name="line.1236"></a>
+<span class="sourceLineNo">1237</span> if (pcrc.isCanceled()) {<a
name="line.1237"></a>
+<span class="sourceLineNo">1238</span> // To finish if the call was
cancelled before we set the notification (race condition)<a
name="line.1238"></a>
+<span class="sourceLineNo">1239</span> call.callComplete();<a
name="line.1239"></a>
+<span class="sourceLineNo">1240</span> return new Pair<Message,
CellScanner>(call.response, call.cells);<a name="line.1240"></a>
+<span class="sourceLineNo">1241</span> }<a name="line.1241"></a>
+<span class="sourceLineNo">1242</span> } else {<a name="line.1242"></a>
+<span class="sourceLineNo">1243</span> cts = null;<a name="line.1243"></a>
+<span class="sourceLineNo">1244</span>
connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());<a
name="line.1244"></a>
+<span class="sourceLineNo">1245</span> }<a name="line.1245"></a>
+<span class="sourceLineNo">1246</span><a name="line.1246"></a>
+<span class="sourceLineNo">1247</span> while (!call.done) {<a
name="line.1247"></a>
+<span class="sourceLineNo">1248</span> if (call.checkAndSetTimeout()) {<a
name="line.1248"></a>
+<span class="sourceLineNo">1249</span> if (cts != null)
connection.callSender.remove(cts);<a name="line.1249"></a>
+<span class="sourceLineNo">1250</span> break;<a name="line.1250"></a>
+<span class="sourceLineNo">1251</span> }<a name="line.1251"></a>
+<span class="sourceLineNo">1252</span> if
(connection.shouldCloseConnection.get()) {<a name="line.1252"></a>
+<span class="sourceLineNo">1253</span> throw new
ConnectionClosingException("Call id=" + call.id +<a name="line.1253"></a>
+<span class="sourceLineNo">1254</span> " on server " + addr + "
aborted: connection is closing");<a name="line.1254"></a>
+<span class="sourceLineNo">1255</span> }<a name="line.1255"></a>
+<span class="sourceLineNo">1256</span> try {<a name="line.1256"></a>
+<span class="sourceLineNo">1257</span> synchronized (call) {<a
name="line.1257"></a>
+<span class="sourceLineNo">1258</span> if (call.done) break;<a
name="line.1258"></a>
+<span class="sourceLineNo">1259</span>
call.wait(Math.min(call.remainingTime(), 1000) + 1);<a name="line.1259"></a>
+<span class="sourceLineNo">1260</span> }<a name="line.1260"></a>
+<span class="sourceLineNo">1261</span> } catch (InterruptedException e)
{<a name="line.1261"></a>
+<span class="sourceLineNo">1262</span> call.setException(new
InterruptedIOException());<a name="line.1262"></a>
+<span class="sourceLineNo">1263</span> if (cts != null)
connection.callSender.remove(cts);<a name="line.1263"></a>
+<span class="sourceLineNo">1264</span> throw e;<a name="line.1264"></a>
+<span class="sourceLineNo">1265</span> }<a name="line.1265"></a>
+<span class="sourceLineNo">1266</span> }<a name="line.1266"></a>
+<span class="sourceLineNo">1267</span><a name="line.1267"></a>
+<span class="sourceLineNo">1268</span> if (call.error != null) {<a
name="line.1268"></a>
+<span class="sourceLineNo">1269</span> if (call.error instanceof
RemoteException) {<a name="line.1269"></a>
+<span class="sourceLineNo">1270</span> call.error.fillInStackTrace();<a
name="line.1270"></a>
+<span class="sourceLineNo">1271</span> throw call.error;<a
name="line.1271"></a>
+<span class="sourceLineNo">1272</span> }<a name="line.1272"></a>
+<span class="sourceLineNo">1273</span> // local exception<a
name="line.1273"></a>
+<span class="sourceLineNo">1274</span> throw wrapException(addr,
call.error);<a name="line.1274"></a>
+<span class="sourceLineNo">1275</span> }<a name="line.1275"></a>
+<span class="sourceLineNo">1276</span><a name="line.1276"></a>
+<span class="sourceLineNo">1277</span> return new Pair<Message,
CellScanner>(call.response, call.cells);<a name="line.1277"></a>
+<span class="sourceLineNo">1278</span> }<a name="line.1278"></a>
+<span class="sourceLineNo">1279</span><a name="line.1279"></a>
+<span class="sourceLineNo">1280</span><a name="line.1280"></a>
+<span class="sourceLineNo">1281</span> /**<a name="line.1281"></a>
+<span class="sourceLineNo">1282</span> * Interrupt the connections to the
given ip:port server. This should be called if the server<a
name="line.1282"></a>
+<span class="sourceLineNo">1283</span> * is known as actually dead. This
will not prevent current operation to be retried, and,<a name="line.1283"></a>
+<span class="sourceLineNo">1284</span> * depending on their own behavior,
they may retry on the same server. This can be a feature,<a
name="line.1284"></a>
+<span class="sourceLineNo">1285</span> * for example at startup. In any
case, they're likely to get connection refused (if the<a name="line.1285"></a>
+<span class="sourceLineNo">1286</span> * process died) or no route to host:
i.e. their next retries should be faster and with a<a name="line.1286"></a>
+<span class="sourceLineNo">1287</span> * safe exception.<a
name="line.1287"></a>
+<span class="sourceLineNo">1288</span> */<a name="line.1288"></a>
+<span class="sourceLineNo">1289</span> @Override<a name="line.1289"></a>
+<span class="sourceLineNo">1290</span> public void
cancelConnections(ServerName sn) {<a name="line.1290"></a>
+<span class="sourceLineNo">1291</span> synchronized (connections) {<a
name="line.1291"></a>
+<span class="sourceLineNo">1292</span> for (Connection connection :
connections.values()) {<a name="line.1292"></a>
+<span class="sourceLineNo">1293</span> if (connection.isAlive()
&&<a name="line.1293"></a>
+<span class="sourceLineNo">1294</span>
connection.getRemoteAddress().getPort() == sn.getPort() &&<a
name="line.1294"></a>
+<span class="sourceLineNo">1295</span>
connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {<a
name="line.1295"></a>
+<span class="sourceLineNo">1296</span> LOG.info("The server on " +
sn.toString() +<a name="line.1296"></a>
+<span class="sourceLineNo">1297</span> " is dead - stopping the
connection " + connection.remoteId);<a name="line.1297"></a>
+<span class="sourceLineNo">1298</span> connection.interrupt(); //
We're interrupting a Reader. It means we want it to finish.<a
name="line.1298"></a>
+<span class="sourceLineNo">1299</span> //
This will close the connection as well.<a name="line.1299"></a>
+<span class="sourceLineNo">1300</span> }<a name="line.1300"></a>
+<span class="sourceLineNo">1301</span> }<a name="line.1301"></a>
+<span class="sourceLineNo">1302</span> }<a name="line.1302"></a>
+<span class="sourceLineNo">1303</span> }<a name="line.1303"></a>
+<span class="sourceLineNo">1304</span><a name="line.1304"></a>
+<span class="sourceLineNo">1305</span> /**<a name="line.1305"></a>
+<span class="sourceLineNo">1306</span> * Get a connection from the pool, or
create a new one and add it to the<a name="line.1306"></a>
+<span class="sourceLineNo">1307</span> * pool. Connections to a given
host/port are reused.<a name="line.1307"></a>
+<span class="sourceLineNo">1308</span> */<a name="line.1308"></a>
+<span class="sourceLineNo">1309</span> protected Connection
getConnection(User ticket, Call call, InetSocketAddress addr)<a
name="line.1309"></a>
+<span class="sourceLineNo">1310</span> throws IOException {<a
name="line.1310"></a>
+<span class="sourceLineNo">1311</span> if (!running.get()) throw new
StoppedRpcClientException();<a name="line.1311"></a>
+<span class="sourceLineNo">1312</span> Connection connection;<a
name="line.1312"></a>
+<span class="sourceLineNo">1313</span> ConnectionId remoteId =<a
name="line.1313"></a>
+<span class="sourceLineNo">1314</span> new ConnectionId(ticket,
call.md.getService().getName(), addr);<a name="line.1314"></a>
+<span class="sourceLineNo">1315</span> synchronized (connections) {<a
name="line.1315"></a>
+<span class="sourceLineNo">1316</span> connection =
connections.get(remoteId);<a name="line.1316"></a>
+<span class="sourceLineNo">1317</span> if (connection == null) {<a
name="line.1317"></a>
+<span class="sourceLineNo">1318</span> connection =
createConnection(remoteId, this.codec, this.compressor);<a name="line.1318"></a>
+<span class="sourceLineNo">1319</span> connections.put(remoteId,
connection);<a name="line.1319"></a>
+<span class="sourceLineNo">1320</span> }<a name="line.1320"></a>
+<span class="sourceLineNo">1321</span> }<a name="line.1321"></a>
+<span class="sourceLineNo">1322</span><a name="line.1322"></a>
+<span class="sourceLineNo">1323</span> return connection;<a
name="line.1323"></a>
+<span class="sourceLineNo">1324</span> }<a name="line.1324"></a>
+<span class="sourceLineNo">1325</span>}<a name="line.1325"></a>