Author: davsclaus
Date: Sun Aug 26 10:49:41 2012
New Revision: 1377416

URL: http://svn.apache.org/viewvc?rev=1377416&view=rev
Log:
CAMEL-5480: Quickfix engines is deferred to be started after CamelContext has 
been fully started. To avoid any side effects with engines firing too fast.

Modified:
    camel/branches/camel-2.9.x/   (props changed)
    
camel/branches/camel-2.9.x/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
    
camel/branches/camel-2.9.x/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
    
camel/branches/camel-2.9.x/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1377413
  Merged /camel/branches/camel-2.10.x:r1377415

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.9.x/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java?rev=1377416&r1=1377415&r2=1377416&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
 Sun Aug 26 10:49:41 2012
@@ -22,7 +22,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.StartupListener;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
 import org.slf4j.Logger;
@@ -32,7 +34,7 @@ import quickfix.MessageFactory;
 import quickfix.MessageStoreFactory;
 import quickfix.SessionSettings;
 
-public class QuickfixjComponent extends DefaultComponent {
+public class QuickfixjComponent extends DefaultComponent implements 
StartupListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(QuickfixjComponent.class);
 
     private final Object engineInstancesLock = new Object();
@@ -80,26 +82,23 @@ public class QuickfixjComponent extends 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        synchronized (engineInstancesLock) {
-            for (QuickfixjEngine engine : engines.values()) {
-                startQuickfixjEngine(engine);
-            }
-        }
-    }
-
-    private void startQuickfixjEngine(QuickfixjEngine engine) throws Exception 
{
-        LOG.info("Starting QuickFIX/J engine: uri=", engine.getUri());
-        engine.start();
+        // we defer starting quickfix engines till the onCamelContextStarted 
callback
     }
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
+        // stop engines when stopping component
         synchronized (engineInstancesLock) {
             for (QuickfixjEngine engine : engines.values()) {
                 engine.stop();
             }
         }
+        super.doStop();
+    }
+
+    private void startQuickfixjEngine(QuickfixjEngine engine) throws Exception 
{
+        LOG.info("Starting QuickFIX/J engine: {}", engine.getUri());
+        engine.start();
     }
 
     // Test Support
@@ -131,4 +130,13 @@ public class QuickfixjComponent extends 
         this.configurations = configurations;
     }
 
+    @Override
+    public void onCamelContextStarted(CamelContext camelContext, boolean 
alreadyStarted) throws Exception {
+        // only start quickfix engines when CamelContext have finished starting
+        synchronized (engineInstancesLock) {
+            for (QuickfixjEngine engine : engines.values()) {
+                startQuickfixjEngine(engine);
+            }
+        }
+    }
 }

Modified: 
camel/branches/camel-2.9.x/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1377416&r1=1377415&r2=1377416&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
 Sun Aug 26 10:49:41 2012
@@ -26,6 +26,7 @@ import java.util.concurrent.CopyOnWriteA
 
 import javax.management.JMException;
 
+import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.quickfixj.jmx.JmxExporter;
 import org.slf4j.Logger;
@@ -76,7 +77,7 @@ import quickfix.UnsupportedMessageType;
  * The wrapper will create an initiator or acceptor or both depending on the
  * roles of sessions described in the settings file.
  */
-public class QuickfixjEngine {
+public class QuickfixjEngine extends ServiceSupport {
     public static final String DEFAULT_START_TIME = "00:00:00";
     public static final String DEFAULT_END_TIME = "00:00:00";
     public static final long DEFAULT_HEARTBTINT = 30;
@@ -93,10 +94,7 @@ public class QuickfixjEngine {
     private final LogFactory sessionLogFactory;
     private final MessageFactory messageFactory;
     private final MessageCorrelator messageCorrelator = new 
MessageCorrelator();
-    
-    private boolean started;
     private List<QuickfixjEventListener> eventListeners = new 
CopyOnWriteArrayList<QuickfixjEventListener>();
-
     private final String uri;
 
     public enum ThreadModel {
@@ -191,7 +189,8 @@ public class QuickfixjEngine {
         return new SessionSettings(inputStream);
     }
 
-    public void start() throws Exception {
+    @Override
+    protected void doStart() throws Exception {
         if (acceptor != null) {
             acceptor.start();
             if (jmxExporter != null) {
@@ -204,28 +203,25 @@ public class QuickfixjEngine {
                 jmxExporter.register(initiator);
             }
         }
-        started = true;
-    }
-
-    public void stop() throws Exception {
-        stop(forcedShutdown);
     }
 
-    public void stop(boolean force) throws Exception {
+    @Override
+    protected void doStop() throws Exception {
         if (acceptor != null) {
             acceptor.stop();
         }
         if (initiator != null) {
             initiator.stop();
         }
-        started = false;
     }
 
-    public boolean isStarted() {
-        return started;
+    @Override
+    protected void doShutdown() throws Exception {
+        // also clear event listeners
+        eventListeners.clear();
     }
-    
-    private Initiator createInitiator(Application application, SessionSettings 
settings, 
+
+    private Initiator createInitiator(Application application, SessionSettings 
settings,
             MessageStoreFactory messageStoreFactory, LogFactory 
sessionLogFactory, 
             MessageFactory messageFactory, ThreadModel threadModel) throws 
ConfigError {
         

Modified: 
camel/branches/camel-2.9.x/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java?rev=1377416&r1=1377415&r2=1377416&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
 Sun Aug 26 10:49:41 2012
@@ -26,6 +26,7 @@ import java.net.URLClassLoader;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -76,6 +77,7 @@ public class QuickfixjComponentTest {
     private SessionID sessionID;
     private SessionSettings settings;
     private QuickfixjComponent component;
+    private CamelContext camelContext;
     private MessageFactory engineMessageFactory;
     private MessageStoreFactory engineMessageStoreFactory;
     private LogFactory engineLogFactory;
@@ -118,9 +120,10 @@ public class QuickfixjComponentTest {
     }
     
     private void setUpComponent(boolean injectQfjPlugins) throws IOException, 
MalformedURLException, NoSuchMethodException {
-        DefaultCamelContext camelContext = new DefaultCamelContext();
+        camelContext = new DefaultCamelContext();
         component = new QuickfixjComponent();
         component.setCamelContext(camelContext);
+        camelContext.addComponent("quickfix", component);
         
         if (injectQfjPlugins) {
             engineMessageFactory = new DefaultMessageFactory();
@@ -144,6 +147,9 @@ public class QuickfixjComponentTest {
         if (component != null) {
             component.stop();
         }
+        if (camelContext != null) {
+            camelContext.stop();
+        }
     }
 
     @Test
@@ -168,8 +174,9 @@ public class QuickfixjComponentTest {
         assertThat(component.getEngines().get(settingsFile.getName()), 
is(notNullValue()));
         
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), 
is(false));
         assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(sessionID));
-        
-        component.start();
+
+        // will start the component
+        camelContext.start();
         
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), 
is(true));
         
         // Move these too an endpoint testcase if one exists
@@ -186,7 +193,8 @@ public class QuickfixjComponentTest {
 
         writeSettings();
 
-        component.start();
+        // will start the component
+        camelContext.start();
 
         Endpoint e1 = 
component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
         assertThat(component.getEngines().size(), is(1));
@@ -229,9 +237,10 @@ public class QuickfixjComponentTest {
 
         // Endpoint automatically starts the consumer
         assertThat(((StatefulService)consumer).isStarted(), is(true));
-        
-        component.start();
-        
+
+        // will start the component
+        camelContext.start();
+
         assertTrue("Session not created", latch.await(5000, 
TimeUnit.MILLISECONDS));
         
         component.stop();
@@ -283,8 +292,9 @@ public class QuickfixjComponentTest {
         });
         ServiceHelper.startService(consumer);
 
-        component.start();
-        
+        // will start the component
+        camelContext.start();
+
         assertTrue("Session not created", logonLatch.await(5000, 
TimeUnit.MILLISECONDS));
        
         Endpoint producerEndpoint = 
component.createEndpoint(getEndpointUri(settingsFile.getName(), 
acceptorSessionID));


Reply via email to