Author: hiranya
Date: Wed Aug 14 21:35:25 2013
New Revision: 1514059
URL: http://svn.apache.org/r1514059
Log:
Handling the endOfInput property in the PT SourceHandler - Otherwise JMX stats
get messed up.
Modified:
synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/registry/DynamicResourceTest.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/jmx/LatencyView.java
Modified:
synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/registry/DynamicResourceTest.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/registry/DynamicResourceTest.java?rev=1514059&r1=1514058&r2=1514059&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/registry/DynamicResourceTest.java
(original)
+++
synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/registry/DynamicResourceTest.java
Wed Aug 14 21:35:25 2013
@@ -101,7 +101,7 @@ public class DynamicResourceTest extends
System.out.println("Testing advanced sequence caching...");
synCtx = TestUtils.createLightweightSynapseMessageContext("<empty/>",
config);
System.out.println("Waiting for the cache to expire...");
- Thread.sleep(8500L);
+ Thread.sleep(10000L);
Mediator seq3 = synCtx.getSequence(KEY_DYNAMIC_SEQUENCE_1);
assertNotNull(seq3);
assertTrue(((SequenceMediator) seq3).isInitialized());
@@ -114,7 +114,7 @@ public class DynamicResourceTest extends
System.out.println("Testing sequence reloading...");
registry.updateResource(KEY_DYNAMIC_SEQUENCE_1,
TestUtils.createOMElement(DYNAMIC_SEQUENCE_2));
System.out.println("Waiting for the cache to expire...");
- Thread.sleep(8500L);
+ Thread.sleep(10000L);
synCtx = TestUtils.createLightweightSynapseMessageContext("<empty/>",
config);
Mediator seq4 = synCtx.getSequence(KEY_DYNAMIC_SEQUENCE_1);
assertNotNull(seq4);
@@ -158,7 +158,7 @@ public class DynamicResourceTest extends
System.out.println("Testing advanced endpoint caching...");
synCtx = TestUtils.createSynapseMessageContext("<empty/>", config);
System.out.println("Waiting for the cache to expire...");
- Thread.sleep(8500L);
+ Thread.sleep(10000L);
Endpoint ep3 = synCtx.getEndpoint(KEY_DYNAMIC_ENDPOINT_1);
assertNotNull(ep3);
assertEquals(1, registry.getHitCount());
@@ -168,7 +168,7 @@ public class DynamicResourceTest extends
System.out.println("Testing endpoint reloading...");
registry.updateResource(KEY_DYNAMIC_ENDPOINT_1,
TestUtils.createOMElement(DYNAMIC_ENDPOINT_2));
System.out.println("Waiting for the cache to expire...");
- Thread.sleep(8500L);
+ Thread.sleep(10000L);
synCtx = TestUtils.createSynapseMessageContext("<empty/>", config);
Endpoint ep4 = synCtx.getEndpoint(KEY_DYNAMIC_ENDPOINT_1);
assertNotNull(ep4);
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java?rev=1514059&r1=1514058&r2=1514059&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
Wed Aug 14 21:35:25 2013
@@ -78,7 +78,11 @@ public class SourceContext {
public void reset() {
this.request = null;
this.response = null;
- this.state = ProtocolState.REQUEST_READY;
+ if (this.state != ProtocolState.CLOSED) {
+ // if the connection is not closed yet, prepare to receive a new
request
+ // on it
+ this.state = ProtocolState.REQUEST_READY;
+ }
if (writer != null) {
ByteBuffer buffer = writer.getBuffer();
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java?rev=1514059&r1=1514058&r2=1514059&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java
Wed Aug 14 21:35:25 2013
@@ -250,16 +250,31 @@ public class SourceHandler implements NH
metrics.incrementBytesSent(bytesSent);
} catch (IOException e) {
logIOException(e);
-
informWriterError(conn);
-
SourceContext.updateState(conn, ProtocolState.CLOSING);
sourceConfiguration.getSourceConnections().shutDownConnection(conn);
}
}
public void endOfInput(NHttpServerConnection conn) throws IOException {
- closed(conn);
+ ProtocolState state = SourceContext.getState(conn);
+
+ if (state == ProtocolState.REQUEST_READY || state ==
ProtocolState.RESPONSE_DONE) {
+ if (log.isDebugEnabled()) {
+ log.debug("Keep-Alive connection was closed at the remote end:
" + conn);
+ }
+ } else if (state == ProtocolState.REQUEST_BODY || state ==
ProtocolState.REQUEST_HEAD) {
+ informReaderError(conn);
+ log.warn("Connection closed at the remote end while reading the
request: " + conn);
+ } else if (state == ProtocolState.RESPONSE_BODY || state ==
ProtocolState.RESPONSE_HEAD) {
+ informWriterError(conn);
+ log.warn("Connection closed at the remote end while writing the
response: " + conn);
+ } else if (state == ProtocolState.REQUEST_DONE) {
+ log.warn("Connection closed by the client after request is read: "
+ conn);
+ }
+
+ SourceContext.updateState(conn, ProtocolState.CLOSED);
+ sourceConfiguration.getSourceConnections().shutDownConnection(conn);
}
public void exception(NHttpServerConnection conn, Exception e) {
@@ -361,10 +376,9 @@ public class SourceHandler implements NH
try {
sourceConfiguration.getHttpProcessor().process(response,
httpContext);
conn.submitResponse(response);
- SourceContext.updateState(conn, ProtocolState.CLOSED);
- conn.close();
} catch (Exception ex) {
log.error("Error while handling HttpException", ex);
+ } finally {
SourceContext.updateState(conn, ProtocolState.CLOSED);
sourceConfiguration.getSourceConnections().shutDownConnection(conn);
}
@@ -403,12 +417,10 @@ public class SourceHandler implements NH
if (log.isDebugEnabled()) {
log.debug("Keep-Alive connection was closed: " + conn);
}
- } else if (state == ProtocolState.REQUEST_BODY ||
- state == ProtocolState.REQUEST_HEAD) {
+ } else if (state == ProtocolState.REQUEST_BODY || state ==
ProtocolState.REQUEST_HEAD) {
informReaderError(conn);
log.warn("Connection closed while reading the request: " + conn);
- } else if (state == ProtocolState.RESPONSE_BODY ||
- state == ProtocolState.RESPONSE_HEAD) {
+ } else if (state == ProtocolState.RESPONSE_BODY || state ==
ProtocolState.RESPONSE_HEAD) {
informWriterError(conn);
log.warn("Connection closed while writing the response: " + conn);
} else if (state == ProtocolState.REQUEST_DONE) {
@@ -416,9 +428,10 @@ public class SourceHandler implements NH
}
metrics.disconnected();
-
- SourceContext.updateState(conn, ProtocolState.CLOSED);
- sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+ if (state != ProtocolState.CLOSED) {
+ SourceContext.updateState(conn, ProtocolState.CLOSED);
+
sourceConfiguration.getSourceConnections().shutDownConnection(conn);
+ }
}
private void handleInvalidState(NHttpServerConnection conn, String action)
{
Modified:
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/jmx/LatencyView.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/jmx/LatencyView.java?rev=1514059&r1=1514058&r2=1514059&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/jmx/LatencyView.java
(original)
+++
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/jmx/LatencyView.java
Wed Aug 14 21:35:25 2013
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.Atomi
*
* <ul>
* <li>t1 - Receiving a new request (ServerHandler#requestReceived)</li>
- * <li>t2 - Obtaining a connection to forward the request
(Clienthandler#processConnection)</li>
+ * <li>t2 - Obtaining a connection to forward the request
(ClientHandler#processConnection)</li>
* <li>t3 - Reading the complete response from the backend server
(ClientHandler#inputReady)</li>
* <li>t4 - Writing the complete response to the client
(ServerHandler#outputReady)</li>
* <ul>
@@ -58,7 +58,7 @@ import java.util.concurrent.atomic.Atomi
*/
public class LatencyView implements LatencyViewMBean {
- private static final String NHTTP_LATENCY_VIEW = "NhttpTransportLatency";
+ private static final String NHTTP_LATENCY_VIEW =
"PassThroughTransportLatency";
private static final int SMALL_DATA_COLLECTION_PERIOD = 5;
private static final int LARGE_DATA_COLLECTION_PERIOD = 5 * 60;
@@ -124,7 +124,7 @@ public class LatencyView implements Late
*
* @param reqArrival The request arrival time
* @param reqDeparture The request departure time (backend connection
establishment)
- * @param resArrival The resoponse arrival time
+ * @param resArrival The response arrival time
* @param resDeparture The response departure time
*/
public void notifyTimes(long reqArrival, long reqDeparture,