Author: davsclaus Date: Sun Aug 26 10:47:06 2012 New Revision: 1377413 URL: http://svn.apache.org/viewvc?rev=1377413&view=rev Log: CAMEL-5480: Quickfix engines is deferred to be started after CamelContext has been fully started. To avoid any side effects with engines firing too fast.
Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java?rev=1377413&r1=1377412&r2=1377413&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java (original) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java Sun Aug 26 10:47:06 2012 @@ -20,7 +20,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.StartupListener; import org.apache.camel.impl.DefaultComponent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +31,7 @@ import quickfix.MessageFactory; import quickfix.MessageStoreFactory; import quickfix.SessionSettings; -public class QuickfixjComponent extends DefaultComponent { +public class QuickfixjComponent extends DefaultComponent implements StartupListener { private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class); private final Object engineInstancesLock = new Object(); @@ -77,26 +79,23 @@ public class QuickfixjComponent extends @Override protected void doStart() throws Exception { super.doStart(); - synchronized (engineInstancesLock) { - for (QuickfixjEngine engine : engines.values()) { - startQuickfixjEngine(engine); - } - } - } - - private void startQuickfixjEngine(QuickfixjEngine engine) throws Exception { - LOG.info("Starting QuickFIX/J engine: uri=", engine.getUri()); - engine.start(); + // we defer starting quickfix engines till the onCamelContextStarted callback } @Override protected void doStop() throws Exception { - super.doStop(); + // stop engines when stopping component synchronized (engineInstancesLock) { for (QuickfixjEngine engine : engines.values()) { engine.stop(); } } + super.doStop(); + } + + private void startQuickfixjEngine(QuickfixjEngine engine) throws Exception { + LOG.info("Starting QuickFIX/J engine: {}", engine.getUri()); + engine.start(); } // Test Support @@ -128,4 +127,13 @@ public class QuickfixjComponent extends this.configurations = configurations; } + @Override + public void onCamelContextStarted(CamelContext camelContext, boolean alreadyStarted) throws Exception { + // only start quickfix engines when CamelContext have finished starting + synchronized (engineInstancesLock) { + for (QuickfixjEngine engine : engines.values()) { + startQuickfixjEngine(engine); + } + } + } } Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1377413&r1=1377412&r2=1377413&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java (original) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java Sun Aug 26 10:47:06 2012 @@ -26,6 +26,7 @@ import java.util.concurrent.CopyOnWriteA import javax.management.JMException; +import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.ObjectHelper; import org.quickfixj.jmx.JmxExporter; import org.slf4j.Logger; @@ -76,7 +77,7 @@ import quickfix.UnsupportedMessageType; * The wrapper will create an initiator or acceptor or both depending on the * roles of sessions described in the settings file. */ -public class QuickfixjEngine { +public class QuickfixjEngine extends ServiceSupport { public static final String DEFAULT_START_TIME = "00:00:00"; public static final String DEFAULT_END_TIME = "00:00:00"; public static final long DEFAULT_HEARTBTINT = 30; @@ -93,10 +94,7 @@ public class QuickfixjEngine { private final LogFactory sessionLogFactory; private final MessageFactory messageFactory; private final MessageCorrelator messageCorrelator = new MessageCorrelator(); - - private boolean started; private List<QuickfixjEventListener> eventListeners = new CopyOnWriteArrayList<QuickfixjEventListener>(); - private final String uri; public enum ThreadModel { @@ -191,7 +189,8 @@ public class QuickfixjEngine { return new SessionSettings(inputStream); } - public void start() throws Exception { + @Override + protected void doStart() throws Exception { if (acceptor != null) { acceptor.start(); if (jmxExporter != null) { @@ -204,28 +203,25 @@ public class QuickfixjEngine { jmxExporter.register(initiator); } } - started = true; - } - - public void stop() throws Exception { - stop(forcedShutdown); } - public void stop(boolean force) throws Exception { + @Override + protected void doStop() throws Exception { if (acceptor != null) { acceptor.stop(); } if (initiator != null) { initiator.stop(); } - started = false; } - public boolean isStarted() { - return started; + @Override + protected void doShutdown() throws Exception { + // also clear event listeners + eventListeners.clear(); } - - private Initiator createInitiator(Application application, SessionSettings settings, + + private Initiator createInitiator(Application application, SessionSettings settings, MessageStoreFactory messageStoreFactory, LogFactory sessionLogFactory, MessageFactory messageFactory, ThreadModel threadModel) throws ConfigError { Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java?rev=1377413&r1=1377412&r2=1377413&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java (original) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java Sun Aug 26 10:47:06 2012 @@ -26,6 +26,7 @@ import java.net.URLClassLoader; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.camel.CamelContext; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -76,6 +77,7 @@ public class QuickfixjComponentTest { private SessionID sessionID; private SessionSettings settings; private QuickfixjComponent component; + private CamelContext camelContext; private MessageFactory engineMessageFactory; private MessageStoreFactory engineMessageStoreFactory; private LogFactory engineLogFactory; @@ -118,9 +120,10 @@ public class QuickfixjComponentTest { } private void setUpComponent(boolean injectQfjPlugins) throws IOException, MalformedURLException, NoSuchMethodException { - DefaultCamelContext camelContext = new DefaultCamelContext(); + camelContext = new DefaultCamelContext(); component = new QuickfixjComponent(); component.setCamelContext(camelContext); + camelContext.addComponent("quickfix", component); if (injectQfjPlugins) { engineMessageFactory = new DefaultMessageFactory(); @@ -144,6 +147,9 @@ public class QuickfixjComponentTest { if (component != null) { component.stop(); } + if (camelContext != null) { + camelContext.stop(); + } } @Test @@ -168,8 +174,9 @@ public class QuickfixjComponentTest { assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue())); assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false)); assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(sessionID)); - - component.start(); + + // will start the component + camelContext.start(); assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true)); // Move these too an endpoint testcase if one exists @@ -186,7 +193,8 @@ public class QuickfixjComponentTest { writeSettings(); - component.start(); + // will start the component + camelContext.start(); Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null)); assertThat(component.getEngines().size(), is(1)); @@ -229,9 +237,10 @@ public class QuickfixjComponentTest { // Endpoint automatically starts the consumer assertThat(((StatefulService)consumer).isStarted(), is(true)); - - component.start(); - + + // will start the component + camelContext.start(); + assertTrue("Session not created", latch.await(5000, TimeUnit.MILLISECONDS)); component.stop(); @@ -283,8 +292,9 @@ public class QuickfixjComponentTest { }); ServiceHelper.startService(consumer); - component.start(); - + // will start the component + camelContext.start(); + assertTrue("Session not created", logonLatch.await(5000, TimeUnit.MILLISECONDS)); Endpoint producerEndpoint = component.createEndpoint(getEndpointUri(settingsFile.getName(), acceptorSessionID));