Author: asankha
Date: Thu Dec 13 11:08:06 2007
New Revision: 603986
URL: http://svn.apache.org/viewvc?rev=603986&view=rev
Log:
basic JMX support for transports - nhttp/s
ability to pause() or resume() a transport using HTTPCORE-127
Added:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/ManagementSupport.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/MetricsCollector.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/TransportView.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/TransportViewMBean.java
Modified:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseConstants.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
webservices/synapse/trunk/java/pom.xml
Modified:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseConstants.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseConstants.java?rev=603986&r1=603985&r2=603986&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseConstants.java
(original)
+++
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseConstants.java
Thu Dec 13 11:08:06 2007
@@ -23,6 +23,10 @@
import javax.xml.namespace.QName;
public class BaseConstants {
+ // -- status of a transport --
+ public final static int STOPPED = 0;
+ public final static int STARTED = 1;
+ public final static int PAUSED = 2;
/**
* The JMS message property specifying the SOAP Action
Added:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/ManagementSupport.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/ManagementSupport.java?rev=603986&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/ManagementSupport.java
(added)
+++
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/ManagementSupport.java
Thu Dec 13 11:08:06 2007
@@ -0,0 +1,34 @@
+package org.apache.synapse.transport.base;
+
+import org.apache.axis2.AxisFault;
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+public interface ManagementSupport {
+ public void pause() throws AxisFault;
+ public void resume() throws AxisFault;
+ void maintenenceShutdown(long millis) throws AxisFault;
+
+ public long getMessagesReceived();
+ public long getFaultsReceiving();
+ public long getMessagesSent();
+ public long getFaultsSending();
+ public long getBytesReceived();
+ public long getBytesSent();
+}
Added:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/MetricsCollector.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/MetricsCollector.java?rev=603986&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/MetricsCollector.java
(added)
+++
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/MetricsCollector.java
Thu Dec 13 11:08:06 2007
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.transport.base;
+
+/**
+ * Collects metrics related to a transport that has metrics support enabled
+ */
+public class MetricsCollector {
+
+ private long messagesReceived;
+ private long faultsReceiving;
+ private long timeoutsReceiving;
+ private long bytesReceived;
+
+ private long messagesSent;
+ private long faultsSending;
+ private long timeoutsSending;
+ private long bytesSent;
+
+ public void reset() {
+ messagesReceived = 0;
+ faultsReceiving = 0;
+ timeoutsReceiving = 0;
+ bytesReceived = 0;
+ messagesSent = 0;
+ faultsSending = 0;
+ timeoutsSending = 0;
+ bytesSent = 0;
+ }
+
+ public long getMessagesReceived() {
+ return messagesReceived;
+ }
+
+ public long getFaultsReceiving() {
+ return faultsReceiving;
+ }
+
+ public long getTimeoutsReceiving() {
+ return timeoutsReceiving;
+ }
+
+ public long getBytesReceived() {
+ return bytesReceived;
+ }
+
+ public long getMessagesSent() {
+ return messagesSent;
+ }
+
+ public long getFaultsSending() {
+ return faultsSending;
+ }
+
+ public long getTimeoutsSending() {
+ return timeoutsSending;
+ }
+
+ public long getBytesSent() {
+ return bytesSent;
+ }
+
+ public synchronized void incrementMessagesReceived() {
+ messagesReceived++;
+ }
+
+ public synchronized void incrementFaultsReceiving() {
+ faultsReceiving++;
+ }
+
+ public synchronized void incrementTimeoutsReceiving() {
+ timeoutsReceiving++;
+ }
+
+ public synchronized void incrementBytesReceived(int size) {
+ bytesReceived += size;
+ }
+
+ public synchronized void incrementMessagesSent() {
+ messagesSent++;
+ }
+
+ public synchronized void incrementFaultsSending() {
+ faultsSending++;
+ }
+
+ public synchronized void incrementTimeoutsSending() {
+ timeoutsSending++;
+ }
+
+ public synchronized void incrementBytesSent(int size) {
+ bytesSent += size;
+ }
+}
Added:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/TransportView.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/TransportView.java?rev=603986&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/TransportView.java
(added)
+++
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/TransportView.java
Thu Dec 13 11:08:06 2007
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.transport.base;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.axis2.transport.TransportListener;
+import org.apache.axis2.transport.TransportSender;
+
+public class TransportView implements TransportViewMBean {
+
+ private static final Log log = LogFactory.getLog(TransportView.class);
+
+ public static final int STOPPED = 0;
+ public static final int RUNNING = 1;
+ public static final int PAUSED = 2;
+ public static final int SHUTTING_DOWN = 3;
+
+ private TransportListener listener = null;
+ private TransportSender sender = null;
+
+ public TransportView(TransportListener listener, TransportSender sender) {
+ this.listener = listener;
+ this.sender = sender;
+ }
+
+ // JMX Attributes
+ public long getMessagesReceived() {
+ if (listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getMessagesReceived();
+ }
+ return -1;
+ }
+
+ public long getFaultsReceiving() {
+ if (listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getFaultsReceiving();
+ }
+ return -1;
+ }
+
+ public long getBytesReceived() {
+ if (listener instanceof ManagementSupport) {
+ return ((ManagementSupport) listener).getBytesReceived();
+ }
+ return -1;
+ }
+
+ public long getMessagesSent() {
+ if (sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getMessagesSent();
+ }
+ return -1;
+ }
+
+ public long getFaultsSending() {
+ if (sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getFaultsSending();
+ }
+ return -1;
+ }
+
+ public long getBytesSent() {
+ if (sender instanceof ManagementSupport) {
+ return ((ManagementSupport) sender).getBytesSent();
+ }
+ return -1;
+ }
+
+ // JMX Operations
+ public void start() throws Exception{
+ listener.start();
+ }
+
+ public void stop() throws Exception {
+ listener.stop();
+ }
+
+ public void pause() throws Exception {
+ if (listener instanceof ManagementSupport) {
+ ((ManagementSupport) listener).pause();
+ }
+ }
+
+ public void resume() throws Exception {
+ if (listener instanceof ManagementSupport) {
+ ((ManagementSupport) listener).resume();
+ }
+ }
+
+ public void maintenenceShutdown(long seconds) throws Exception {
+ if (listener instanceof ManagementSupport) {
+ ((ManagementSupport) listener).maintenenceShutdown(seconds * 1000);
+ }
+ }
+
+ public void resetStatistics() {
+ log.info("Operation not supported over JMX");
+ }
+}
Added:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/TransportViewMBean.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/TransportViewMBean.java?rev=603986&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/TransportViewMBean.java
(added)
+++
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/TransportViewMBean.java
Thu Dec 13 11:08:06 2007
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.transport.base;
+
+public interface TransportViewMBean {
+
+ // JMX Attributes
+ public long getMessagesReceived();
+ public long getFaultsReceiving();
+ public long getMessagesSent();
+ public long getFaultsSending();
+ public long getBytesReceived();
+ public long getBytesSent();
+
+ // JMX Operations
+ public void start() throws Exception;
+ public void stop() throws Exception;
+ public void pause() throws Exception;
+ public void resume() throws Exception;
+ public void maintenenceShutdown(long seconds) throws Exception;
+
+ public void resetStatistics();
+}
Modified:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=603986&r1=603985&r2=603986&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
(original)
+++
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
Thu Dec 13 11:08:06 2007
@@ -22,8 +22,10 @@
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.lang.management.ManagementFactory;
import javax.net.ssl.SSLContext;
+import javax.management.*;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
@@ -39,17 +41,20 @@
import org.apache.http.impl.nio.reactor.SSLIOSessionHandler;
import org.apache.http.nio.NHttpServiceHandler;
import org.apache.http.nio.reactor.IOEventDispatch;
-import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParams;
+import org.apache.synapse.transport.base.ManagementSupport;
+import org.apache.synapse.transport.base.MetricsCollector;
+import org.apache.synapse.transport.base.BaseConstants;
+import org.apache.synapse.transport.base.TransportView;
/**
* NIO transport listener for Axis2 based on HttpCore and NIO extensions
*/
-public class HttpCoreNIOListener implements TransportListener {
+public class HttpCoreNIOListener implements TransportListener,
ManagementSupport {
private static final Log log =
LogFactory.getLog(HttpCoreNIOListener.class);
@@ -68,6 +73,10 @@
private SSLContext sslContext = null;
/** The SSL session handler that manages client authentication etc */
private SSLIOSessionHandler sslIOSessionHandler = null;
+ /** Metrics collector for this transport */
+ private MetricsCollector metrics = new MetricsCollector();
+ /** state of the listener */
+ private int state = BaseConstants.STOPPED;
/**
* configure and start the IO reactor on the specified port
@@ -96,10 +105,10 @@
log.error("Error starting the IOReactor", e);
}
- NHttpServiceHandler handler = new ServerHandler(cfgCtx, params,
sslContext != null);
+ NHttpServiceHandler handler = new ServerHandler(cfgCtx, params,
sslContext != null, metrics);
IOEventDispatch ioEventDispatch = getEventDispatch(
handler, sslContext, sslIOSessionHandler, params);
-
+ state = BaseConstants.STARTED;
try {
ioReactor.listen(new InetSocketAddress(port));
ioReactor.execute(ioEventDispatch);
@@ -172,6 +181,19 @@
} else {
serviceEPRPrefix = getServiceEPRPrefix(cfgCtx, host, port);
}
+
+ // register with JMX
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName name = null;
+ try {
+ name = new
ObjectName("org.apache.axis2:Type=Transport,ConnectorName=" +
+ "nio-http" + (sslContext == null ? "" : "s"));
+ TransportView tBean = new TransportView(this, null);
+ mbs.registerMBean(tBean, name);
+ } catch (Exception e) {
+ handleException("Error registering the non-blocking http" +
+ (sslContext == null ? "" : "s") + " transport for JMX
management", e);
+ }
}
/**
@@ -244,8 +266,10 @@
* @throws AxisFault on error
*/
public void stop() throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
try {
ioReactor.shutdown();
+ state = BaseConstants.STOPPED;
log.info("Listener shut down");
} catch (IOException e) {
handleException("Error shutting down IOReactor", e);
@@ -253,6 +277,58 @@
}
/**
+ * Pause the listener - Stops accepting new connections, but continues
processing existing
+ * connections until they complete. This helps bring an instance into a
maintenence mode
+ * @throws AxisFault
+ */
+ public void pause() throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
+ try {
+ ioReactor.pause();
+ state = BaseConstants.PAUSED;
+ log.info("Listener paused");
+ } catch (IOException e) {
+ handleException("Error pausing IOReactor", e);
+ }
+ }
+
+ /**
+ * Resume the lister - Brings the lister into active mode back from a
paused state
+ * @throws AxisFault
+ */
+ public void resume() throws AxisFault {
+ if (state != BaseConstants.PAUSED) return;
+ try {
+ ioReactor.resume();
+ state = BaseConstants.STARTED;
+ log.info("Listener resumed");
+ } catch (IOException e) {
+ handleException("Error resuming IOReactor", e);
+ }
+ }
+
+ /**
+ * Stop accepting new connections, and wait the maximum specified time for
in-flight
+ * requests to complete before a controlled shutdown for maintenence
+ *
+ * @param millis a number of milliseconds to wait until pending requests
are allowed to complete
+ * @throws AxisFault
+ */
+ public void maintenenceShutdown(long millis) throws AxisFault {
+ if (state != BaseConstants.STARTED) return;
+ try {
+ long start = System.currentTimeMillis();
+ ioReactor.pause();
+ ioReactor.shutdown(millis);
+ state = BaseConstants.STOPPED;
+ log.info("Listener shutdown in : " + (System.currentTimeMillis() -
start) / 1000 + "s");
+ } catch (IOException e) {
+ handleException("Error shutting down the IOReactor for
maintenence", e);
+ }
+ }
+
+
+ /**
* Return the EPR for the given service (implements deprecated method
temporarily)
*/
public EndpointReference getEPRForService(String serviceName, String ip)
throws AxisFault {
@@ -297,4 +373,37 @@
throw new AxisFault(msg);
}
+ // -- jmx/management methods--
+ public long getMessagesReceived() {
+ if (metrics != null) {
+ return metrics.getMessagesReceived();
+ }
+ return -1;
+ }
+
+ public long getFaultsReceiving() {
+ if (metrics != null) {
+ return metrics.getFaultsReceiving();
+ }
+ return -1;
+ }
+
+ public long getBytesReceived() {
+ if (metrics != null) {
+ return metrics.getBytesReceived();
+ }
+ return -1;
+ }
+
+ public long getMessagesSent() {
+ return -1;
+ }
+
+ public long getFaultsSending() {
+ return -1;
+ }
+
+ public long getBytesSent() {
+ return -1;
+ }
}
Modified:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=603986&r1=603985&r2=603986&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
(original)
+++
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
Thu Dec 13 11:08:06 2007
@@ -22,6 +22,7 @@
import org.apache.synapse.transport.nhttp.util.PipeImpl;
import org.apache.synapse.transport.nhttp.util.WorkerPool;
import org.apache.synapse.transport.nhttp.util.WorkerPoolFactory;
+import org.apache.synapse.transport.base.MetricsCollector;
import org.apache.http.*;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.entity.ByteArrayEntity;
@@ -70,6 +71,8 @@
/** the thread pool to process requests */
private WorkerPool workerPool = null;
+ /** the metrics collector */
+ private MetricsCollector metrics = null;
private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
private static final String RESPONSE_SOURCE_CHANNEL =
"response-source-channel";
@@ -77,11 +80,12 @@
private static final String RESPONSE_BUFFER = "response-buffer";
public ServerHandler(final ConfigurationContext cfgCtx, final HttpParams
params,
- final boolean isHttps) {
+ final boolean isHttps, final MetricsCollector metrics) {
super();
this.cfgCtx = cfgCtx;
this.params = params;
this.isHttps = isHttps;
+ this.metrics = metrics;
this.responseFactory = new DefaultHttpResponseFactory();
this.httpProcessor = getHttpProcessor();
this.connStrategy = new DefaultConnectionReuseStrategy();
@@ -131,16 +135,22 @@
// hand off processing of the request to a thread off the pool
workerPool.execute(
- new ServerWorker(cfgCtx, conn, isHttps, this,
+ new ServerWorker(cfgCtx, conn, isHttps, metrics, this,
request, Channels.newInputStream(requestPipe.source()),
response, Channels.newOutputStream(responsePipe.sink())));
} catch (IOException e) {
handleException("Error processing request received for : " +
request.getRequestLine().getUri(), e, conn);
+ if (metrics != null) {
+ metrics.incrementFaultsReceiving();
+ }
} catch (Exception e) {
handleException("Error processing request received for : " +
request.getRequestLine().getUri(), e, conn);
+ if (metrics != null) {
+ metrics.incrementFaultsReceiving();
+ }
}
}
@@ -159,6 +169,9 @@
while (decoder.read(inbuf) > 0) {
inbuf.flip();
sink.write(inbuf);
+ if (metrics != null) {
+ metrics.incrementBytesReceived(inbuf.position());
+ }
inbuf.compact();
}
@@ -241,16 +254,23 @@
}
} else {
log.warn("Connection Timeout");
+ if (metrics != null) {
+ metrics.incrementTimeoutsReceiving();
+ }
}
shutdownConnection(conn);
}
public void connected(final NHttpServerConnection conn) {
- log.trace("New incoming connection");
+ if (log.isTraceEnabled()) {
+ log.trace("New incoming connection");
+ }
}
public void closed(final NHttpServerConnection conn) {
- log.trace("Connection closed");
+ if (log.isTraceEnabled()) {
+ log.trace("Connection closed");
+ }
}
/**
Modified:
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java?rev=603986&r1=603985&r2=603986&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
(original)
+++
webservices/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
Thu Dec 13 11:08:06 2007
@@ -54,6 +54,7 @@
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.protocol.HTTP;
import org.apache.ws.commons.schema.XmlSchema;
+import org.apache.synapse.transport.base.MetricsCollector;
/**
* Processes an incoming request through Axis2. An instance of this class
would be created to
@@ -81,6 +82,8 @@
private InputStream is = null;
/** the output stream to write the response message body */
private OutputStream os = null;
+ /** the metrics collector */
+ private MetricsCollector metrics = null;
private static final String SOAPACTION = "SOAPAction";
private static final String LOCATION = "Location";
private static final String CONTENT_TYPE = "Content-Type";
@@ -102,6 +105,7 @@
*/
public ServerWorker(final ConfigurationContext cfgCtx, final
NHttpServerConnection conn,
final boolean isHttps,
+ final MetricsCollector metrics,
final ServerHandler serverHandler,
final HttpRequest request, final InputStream is,
final HttpResponse response, final OutputStream os) {
@@ -109,6 +113,7 @@
this.cfgCtx = cfgCtx;
this.conn = conn;
this.isHttps = isHttps;
+ this.metrics = metrics;
this.serverHandler = serverHandler;
this.request = request;
this.response = response;
@@ -222,6 +227,9 @@
(contentType != null ? contentType.getValue() : null),
(soapAction != null ? soapAction.getValue() : null),
request.getRequestLine().getUri());
+ if (metrics != null) {
+ metrics.incrementMessagesReceived();
+ }
} catch (AxisFault e) {
handleException("Error processing POST request ", e);
}
@@ -421,6 +429,9 @@
request.getRequestLine().getUri(),
cfgCtx,
parameters);
+ if (metrics != null) {
+ metrics.incrementMessagesReceived();
+ }
// do not let the output stream close (as by default
below) since
// we are serving this GET request through the Synapse
engine
return;
@@ -443,6 +454,10 @@
private void handleException(String msg, Exception e) {
+ if (metrics != null) {
+ metrics.incrementFaultsReceiving();
+ }
+
if (e == null) {
log.error(msg);
} else {
Modified: webservices/synapse/trunk/java/pom.xml
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/pom.xml?rev=603986&r1=603985&r2=603986&view=diff
==============================================================================
--- webservices/synapse/trunk/java/pom.xml (original)
+++ webservices/synapse/trunk/java/pom.xml Thu Dec 13 11:08:06 2007
@@ -1045,7 +1045,7 @@
<properties>
<!-- Synapse and related components -->
<synapse.version>1.1.1-SNAPSHOT</synapse.version>
- <httpcore.nio.version>4.0-alpha6</httpcore.nio.version>
+ <httpcore.nio.version>4.0-alpha7-SNAPSHOT</httpcore.nio.version>
<commons.dbcp.version>1.2.2</commons.dbcp.version>
<commons.pool.version>1.3</commons.pool.version>
<commons.vfs.version>1.1-587797</commons.vfs.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]