Author: hiranya
Date: Tue May 12 08:00:27 2009
New Revision: 773818
URL: http://svn.apache.org/viewvc?rev=773818&view=rev
Log:
Enhancements and code cleanup in the FIX transport:
* FIX sender now has its own worker pool and hence does not rely on the FIX
listener any more. Therefore listener and sender can be enabled individually
* Made FIXSessionFactory a singleton to effectively share session data among
the listener and the sender
* Cleanup logic for initiators during sender shutdown
* Minor bug fix at FIXSessionFactory for a bug which prevented the sample 259
and similar scenarios from operating properly
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
Tue May 12 08:00:27 2009
@@ -27,15 +27,12 @@
public class FIXApplicationFactory {
private ConfigurationContext cfgCtx;
- private WorkerPool workerPool;
-
- public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool
workerPool) {
+ public FIXApplicationFactory(ConfigurationContext cfgCtx) {
this.cfgCtx = cfgCtx;
- this.workerPool = workerPool;
}
- public Application getFIXApplication(AxisService service, boolean
acceptor) {
+ public Application getFIXApplication(AxisService service, WorkerPool
workerPool, boolean acceptor) {
return new FIXIncomingMessageHandler(cfgCtx, workerPool, service,
acceptor);
}
}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
Tue May 12 08:00:27 2009
@@ -23,6 +23,7 @@
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.transport.base.BaseUtils;
+import org.apache.axis2.transport.base.threads.WorkerPool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import quickfix.*;
@@ -65,16 +66,29 @@
/** A Map containing all the FIX applications created for initiators,
keyed by FIX EPR */
private Map<String, Application> applicationStore;
/** An ApplicationFactory handles creating FIX Applications
(FIXIncomingMessageHandler Objects) */
- private FIXApplicationFactory applicationFactory;
+ private static FIXApplicationFactory applicationFactory = null;
+
+ private WorkerPool listenerThreadPool;
+ private WorkerPool senderThreadPool;
private Log log;
- public FIXSessionFactory(FIXApplicationFactory applicationFactory) {
- this.applicationFactory = applicationFactory;
+ private static FIXSessionFactory INSTANCE = new FIXSessionFactory();
+
+ public static FIXSessionFactory getInstance(FIXApplicationFactory af) {
+ if (applicationFactory == null) {
+ applicationFactory = af;
+ }
+ return INSTANCE;
+ }
+
+ private FIXSessionFactory() {
this.log = LogFactory.getLog(this.getClass());
this.acceptorStore = new HashMap<String,Acceptor>();
this.initiatorStore = new HashMap<String, Initiator>();
this.applicationStore = new HashMap<String, Application>();
+ this.listenerThreadPool = null;
+ this.senderThreadPool = null;
}
/**
@@ -101,7 +115,7 @@
MessageFactory messageFactory = new DefaultMessageFactory();
quickfix.LogFactory logFactory = getLogFactory(service,
settings, true);
//Get a new FIX Application
- Application messageHandler =
applicationFactory.getFIXApplication(service, true);
+ Application messageHandler =
applicationFactory.getFIXApplication(service, listenerThreadPool, true);
//Create a new FIX Acceptor
Acceptor acceptor = new SocketAcceptor(
messageHandler,
@@ -174,7 +188,7 @@
MessageStoreFactory storeFactory = getMessageStoreFactory(service,
settings, false);
MessageFactory messageFactory = new DefaultMessageFactory();
//Get a new FIX application
- Application messageHandler =
applicationFactory.getFIXApplication(service, false);
+ Application messageHandler =
applicationFactory.getFIXApplication(service, senderThreadPool, false);
try {
//Create a new FIX initiator
@@ -216,7 +230,7 @@
MessageFactory messageFactory = new DefaultMessageFactory();
quickfix.LogFactory logFactory = getLogFactory(service,
settings, true);
//Get a new FIX Application
- Application messageHandler =
applicationFactory.getFIXApplication(service, false);
+ Application messageHandler =
applicationFactory.getFIXApplication(service, senderThreadPool, false);
Initiator initiator = new SocketInitiator(
messageHandler,
@@ -246,10 +260,10 @@
}
} else {
- String msg = "The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM
+ " parameter is " +
- "not specified. Unable to initialize the initiator session
at this stage.";
- log.info(msg);
- throw new AxisFault(msg);
+ // FIX initiator session is not configured
+ // It could be intentional - So not an error (we don't need
initiators at all times)
+ log.info("The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + "
parameter is " +
+ "not specified. Unable to initialize the initiator session
at this stage.");
}
}
@@ -276,6 +290,24 @@
}
/**
+ * Stops all the FIX initiators created so far and cleans up all the
mappings
+ * related to them
+ */
+ public void disposeFIXInitiators() {
+ boolean debugEnabled = log.isDebugEnabled();
+
+ for (String key : initiatorStore.keySet()) {
+ initiatorStore.get(key).stop();
+ if (debugEnabled) {
+ log.debug("FIX initiator to the EPR " + key + " stopped");
+ }
+ }
+
+ initiatorStore.clear();
+ applicationStore.clear();
+ }
+
+ /**
* Returns an array of Strings representing EPRs for the specified service
*
* @param serviceName the name of the service
@@ -444,6 +476,14 @@
}
return app;
}
+
+ public void setListenerThreadPool(WorkerPool listenerThreadPool) {
+ this.listenerThreadPool = listenerThreadPool;
+ }
+
+ public void setSenderThreadPool(WorkerPool senderThreadPool) {
+ this.senderThreadPool = senderThreadPool;
+ }
}
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
Tue May 12 08:00:27 2009
@@ -60,14 +60,8 @@
TransportInDescription trpInDesc) throws AxisFault {
super.init(cfgCtx, trpInDesc);
- //initialize the FIXSessionFactory
- fixSessionFactory = new FIXSessionFactory(
- new FIXApplicationFactory(this.cfgCtx, this.workerPool));
- FIXTransportSender sender = (FIXTransportSender) cfgCtx.
-
getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
- if (sender != null) {
- sender.setSessionFactory(fixSessionFactory);
- }
+ fixSessionFactory = FIXSessionFactory.getInstance(new
FIXApplicationFactory(cfgCtx));
+ fixSessionFactory.setListenerThreadPool(this.workerPool);
log.info("FIX transport listener initialized...");
}
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
Tue May 12 08:00:27 2009
@@ -28,6 +28,8 @@
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axis2.transport.base.AbstractTransportSender;
import org.apache.axis2.transport.base.BaseUtils;
+import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
import org.apache.commons.logging.LogFactory;
import quickfix.*;
import quickfix.field.*;
@@ -51,17 +53,12 @@
private FIXSessionFactory sessionFactory;
private FIXOutgoingMessageHandler messageSender;
+ private WorkerPool workerPool;
public FIXTransportSender() {
this.log = LogFactory.getLog(this.getClass());
}
-
- public void setSessionFactory(FIXSessionFactory sessionFactory) {
- this.sessionFactory = sessionFactory;
- this.messageSender.setSessionFactory(sessionFactory);
- }
-
/**
* @param cfgCtx the axis2 configuration context
* @param transportOut the Out Transport description
@@ -69,10 +66,25 @@
*/
public void init(ConfigurationContext cfgCtx, TransportOutDescription
transportOut) throws AxisFault {
super.init(cfgCtx, transportOut);
+ this.sessionFactory = FIXSessionFactory.getInstance(new
FIXApplicationFactory(cfgCtx));
+ this.workerPool = WorkerPoolFactory.getWorkerPool(
+ 10, 20, 5, -1, "FIX Sender Worker thread group",
"FIX-Worker");
+ this.sessionFactory.setSenderThreadPool(this.workerPool);
messageSender = new FIXOutgoingMessageHandler();
+ messageSender.setSessionFactory(this.sessionFactory);
log.info("FIX transport sender initialized...");
}
+ public void stop() {
+ try {
+ this.workerPool.shutdown(10000);
+ } catch (InterruptedException e) {
+ log.warn("Thread interrupted while waiting for worker pool to shut
down");
+ }
+ sessionFactory.disposeFIXInitiators();
+ super.stop();
+ }
+
/**
* Performs the actual sending of the message.
*