Author: remm
Date: Fri May 18 09:39:26 2018
New Revision: 1831839
URL: http://svn.apache.org/viewvc?rev=1831839&view=rev
Log:
As read and write are symetric, remove code duplication
Modified:
tomcat/trunk/ (props changed)
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
Propchange: tomcat/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri May 18 09:39:26 2018
@@ -1,2 +1,3 @@
/tomcat/tc8.0.x/trunk:1809644
/tomcat/tc8.5.x/trunk:1802799,1808880,1809646
+/tomcat/trunk:1816751-1816762
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1831839&r1=1831838&r2=1831839&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Fri May 18
09:39:26 2018
@@ -862,6 +862,7 @@ public class Nio2Endpoint extends Abstra
* Internal state tracker for scatter/gather operations.
*/
private static class OperationState<A> {
+ private final boolean read;
private final ByteBuffer[] buffers;
private final int offset;
private final int length;
@@ -871,9 +872,12 @@ public class Nio2Endpoint extends Abstra
private final BlockingMode block;
private final CompletionCheck check;
private final CompletionHandler<Long, ? super A> handler;
- private OperationState(ByteBuffer[] buffers, int offset, int
length,
+ private final Semaphore semaphore;
+ private OperationState(boolean read, ByteBuffer[] buffers, int
offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A
attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A>
handler) {
+ CompletionCheck check, CompletionHandler<Long, ? super A>
handler,
+ Semaphore semaphore) {
+ this.read = read;
this.buffers = buffers;
this.offset = offset;
this.length = length;
@@ -883,6 +887,7 @@ public class Nio2Endpoint extends Abstra
this.attachment = attachment;
this.check = check;
this.handler = handler;
+ this.semaphore = semaphore;
}
private volatile long nBytes = 0;
private volatile CompletionState state = CompletionState.PENDING;
@@ -915,8 +920,9 @@ public class Nio2Endpoint extends Abstra
return CompletionState.NOT_DONE;
}
}
- OperationState<A> state = new OperationState<>(dsts, offset,
length, block, timeout, unit, attachment, check, handler);
- ScatterReadCompletionHandler<A> completion = new
ScatterReadCompletionHandler<>();
+ OperationState<A> state = new OperationState<>(true, dsts, offset,
length, block,
+ timeout, unit, attachment, check, handler, readPending);
+ VectoredIOCompletionHandler<A> completion = new
VectoredIOCompletionHandler<>();
Nio2Endpoint.startInline();
long nBytes = 0;
if (!socketBufferHandler.isReadBufferEmpty()) {
@@ -954,84 +960,6 @@ public class Nio2Endpoint extends Abstra
return state.state;
}
- private class ScatterReadCompletionHandler<A> implements
CompletionHandler<Long, OperationState<A>> {
- @Override
- public void completed(Long nBytes, OperationState<A> state) {
- if (nBytes.longValue() < 0) {
- failed(new EOFException(), state);
- } else {
- state.nBytes += nBytes.longValue();
- CompletionState currentState = Nio2Endpoint.isInline() ?
CompletionState.INLINE : CompletionState.DONE;
- boolean complete = true;
- boolean completion = true;
- if (state.check != null) {
- switch (state.check.callHandler(currentState,
state.buffers, state.offset, state.length)) {
- case CONTINUE:
- complete = false;
- break;
- case DONE:
- break;
- case NONE:
- completion = false;
- break;
- }
- }
- if (complete) {
- boolean notify = false;
- readPending.release();
- if (state.block == BlockingMode.BLOCK && currentState
!= CompletionState.INLINE) {
- notify = true;
- } else {
- state.state = currentState;
- }
- if (completion && state.handler != null) {
-
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
- }
- if (notify) {
- synchronized (state) {
- state.state = currentState;
- state.notify();
- }
- }
- } else {
- getSocket().read(state.buffers, state.offset,
state.length,
- state.timeout, state.unit, state, this);
- }
- }
- }
- @Override
- public void failed(Throwable exc, OperationState<A> state) {
- IOException ioe;
- if (exc instanceof IOException) {
- ioe = (IOException) exc;
- } else {
- ioe = new IOException(exc);
- }
- setError(ioe);
- boolean notify = false;
- readPending.release();
- readPending.release();
- if (state.block == BlockingMode.BLOCK) {
- notify = true;
- } else {
- state.state = Nio2Endpoint.isInline() ?
CompletionState.ERROR : CompletionState.DONE;
- }
- if (exc instanceof AsynchronousCloseException) {
- // If already closed, don't call onError and close again
- return;
- }
- if (state.handler != null) {
- state.handler.failed(ioe, state.attachment);
- }
- if (notify) {
- synchronized (state) {
- state.state = Nio2Endpoint.isInline() ?
CompletionState.ERROR : CompletionState.DONE;
- state.notify();
- }
- }
- }
- }
-
@Override
public <A> CompletionState write(ByteBuffer[] srcs, int offset, int
length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
@@ -1068,8 +996,9 @@ public class Nio2Endpoint extends Abstra
return CompletionState.ERROR;
}
}
- OperationState<A> state = new OperationState<>(srcs, offset,
length, block, timeout, unit, attachment, check, handler);
- GatherWriteCompletionHandler<A> completion = new
GatherWriteCompletionHandler<>();
+ OperationState<A> state = new OperationState<>(false, srcs,
offset, length, block,
+ timeout, unit, attachment, check, handler, writePending);
+ VectoredIOCompletionHandler<A> completion = new
VectoredIOCompletionHandler<>();
Nio2Endpoint.startInline();
// It should be less necessary to check the buffer state as it is
easy to flush before
getSocket().write(srcs, offset, length, timeout, unit, state,
completion);
@@ -1092,7 +1021,7 @@ public class Nio2Endpoint extends Abstra
return state.state;
}
- private class GatherWriteCompletionHandler<A> implements
CompletionHandler<Long, OperationState<A>> {
+ private class VectoredIOCompletionHandler<A> implements
CompletionHandler<Long, OperationState<A>> {
@Override
public void completed(Long nBytes, OperationState<A> state) {
if (nBytes.longValue() < 0) {
@@ -1116,7 +1045,7 @@ public class Nio2Endpoint extends Abstra
}
if (complete) {
boolean notify = false;
- writePending.release();
+ state.semaphore.release();
if (state.block == BlockingMode.BLOCK && currentState
!= CompletionState.INLINE) {
notify = true;
} else {
@@ -1132,8 +1061,13 @@ public class Nio2Endpoint extends Abstra
}
}
} else {
- getSocket().write(state.buffers, state.offset,
state.length,
- state.timeout, state.unit, state, this);
+ if (state.read) {
+ getSocket().read(state.buffers, state.offset,
state.length,
+ state.timeout, state.unit, state, this);
+ } else {
+ getSocket().write(state.buffers, state.offset,
state.length,
+ state.timeout, state.unit, state, this);
+ }
}
}
}
@@ -1147,7 +1081,7 @@ public class Nio2Endpoint extends Abstra
}
setError(ioe);
boolean notify = false;
- writePending.release();
+ state.semaphore.release();
if (state.block == BlockingMode.BLOCK) {
notify = true;
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]