Author: cwiklik Date: Mon Aug 9 15:47:08 2010 New Revision: 983692 URL: http://svn.apache.org/viewvc?rev=983692&view=rev Log: UIMA-1855 Fixes lazy initialization problem reported by Findbugs
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java?rev=983692&r1=983691&r2=983692&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/BrokerDeployer.java Mon Aug 9 15:47:08 2010 @@ -56,6 +56,8 @@ public class BrokerDeployer implements A private TransportConnector httpConnector = null; + private Object brokerInstanceMux = new Object(); + public BrokerDeployer(long maxMemoryinBytes) throws Exception { maxBrokerMemory = maxMemoryinBytes; startInternalBroker(); @@ -66,138 +68,182 @@ public class BrokerDeployer implements A } public BrokerService getBroker() { - return service; + synchronized( brokerInstanceMux ) { + return service; + } } - public void startInternalBroker() throws Exception { - TransportConnector connector = null; + public void startInternalBroker() throws Exception { + TransportConnector connector = null; + synchronized (brokerInstanceMux) { + if (maxBrokerMemory > 0) { + System.out + .println("Configuring Internal Broker With Max Memory Of:" + + maxBrokerMemory); + if (UIMAFramework.getLogger(CLASS_NAME) + .isLoggable(Level.CONFIG)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, + CLASS_NAME.getName(), "startInternalBroker", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_broker_memory__CONFIG", + new Object[] { maxBrokerMemory }); + } + } + String[] connectors = service.getNetworkConnectorURIs(); + if (connectors != null) { + for (int i = 0; i < connectors.length; i++) { + System.out + .println("ActiveMQ Broker Started With Connector:" + + connectors[i]); + } + brokerURI = service.getMasterConnectorURI(); + } else { + + String connectorList = ""; + service.setPersistent(false); + int startPort = BASE_JMX_PORT; + if (System.getProperties().containsKey( + "com.sun.management.jmxremote.port")) { + startPort = Integer.parseInt(System + .getProperty("com.sun.management.jmxremote.port")); + + } + while (startPort < MAX_PORT_THRESHOLD && !openPort(startPort)) { + startPort++; + } + if (startPort < (startPort + MAX_PORT_THRESHOLD)) { + MBeanServer jmxServer = null; + // Check if the MBeanServer is available. If it is, plug it + // into the + // local Broker. We only need one MBeanServer in the JVM + if ((jmxServer = ManagementContext.findTigerMBeanServer()) != null) { + System.out + .println(">>> Found TigerMBeanServer Running. Attaching Broker to Tiger."); + service.getManagementContext() + .setMBeanServer(jmxServer); + // Specify JMX Port + service.getManagementContext().setConnectorPort( + startPort); + } else { + service.getManagementContext().setConnectorPort( + startPort); + service.setUseJmx(true); + } + + System.setProperty("com.sun.management.jmxremote.port", + String.valueOf(startPort)); + + System.out + .println("JMX Console connect URI: service:jmx:rmi:///jndi/rmi://localhost:" + + startPort + "/jmxrmi"); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable( + Level.CONFIG)) { + UIMAFramework + .getLogger(CLASS_NAME) + .logrb( + Level.CONFIG, + CLASS_NAME.getName(), + "startInternalBroker", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_jmx_uri__CONFIG", + new Object[] { "service:jmx:rmi:///jndi/rmi://localhost:" + + startPort + "/jmxrmi" }); + } + } + + brokerURI = generateInternalURI("tcp", 18810, true, false); + + // Wait until sucessfull adding connector to the broker + // Sleeps for 1sec between retries until success + int timeBetweenRetries = 1000; + boolean tcpConnectorAcquiredValidPort = false; + while (!tcpConnectorAcquiredValidPort) { + try { + tcpConnector = service.addConnector(brokerURI); + tcpConnectorAcquiredValidPort = true; + } catch (Exception e) { + synchronized (this) { + wait(timeBetweenRetries); + } + } // silence InstanceAlreadyExistsException + + } + System.out.println("Adding TCP Connector:" + + tcpConnector.getConnectUri()); + if (UIMAFramework.getLogger(CLASS_NAME) + .isLoggable(Level.CONFIG)) { + UIMAFramework.getLogger(CLASS_NAME).logrb( + Level.CONFIG, + CLASS_NAME.getName(), + "startInternalBroker", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_adding_connector__CONFIG", + new Object[] { "Adding TCP Connector", + tcpConnector.getConnectUri() }); + } + connectorList = tcpConnector.getName(); + if (System.getProperty("StompSupport") != null) { + String stompURI = generateInternalURI("stomp", 61613, + false, false); + connector = service.addConnector(stompURI); + System.out.println("Adding STOMP Connector:" + + connector.getConnectUri()); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable( + Level.CONFIG)) { + UIMAFramework.getLogger(CLASS_NAME).logrb( + Level.CONFIG, + CLASS_NAME.getName(), + "startInternalBroker", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_adding_connector__CONFIG", + new Object[] { "Adding STOMP Connector", + connector.getConnectUri() }); + } + connectorList += "," + connector.getName(); + } + if (System.getProperty("HTTP") != null) { + String stringPort = System.getProperty("HTTP"); + int p = Integer.parseInt(stringPort); + String httpURI = generateInternalURI("http", p, false, true); + httpConnector = service.addConnector(httpURI); + System.out.println("Adding HTTP Connector:" + + httpConnector.getConnectUri()); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable( + Level.CONFIG)) { + UIMAFramework.getLogger(CLASS_NAME).logrb( + Level.CONFIG, + CLASS_NAME.getName(), + "startInternalBroker", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_adding_connector__CONFIG", + new Object[] { "Adding HTTP Connector", + httpConnector.getConnectUri() }); + } + connectorList += "," + httpConnector.getName(); + } + service.start(); + System.setProperty("ActiveMQConnectors", connectorList); + System.out.println("Broker Service Started - URL:" + + service.getVmConnectorURI()); + if (UIMAFramework.getLogger(CLASS_NAME) + .isLoggable(Level.CONFIG)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, + CLASS_NAME.getName(), "startInternalBroker", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_broker_started__CONFIG", + new Object[] { service.getVmConnectorURI() }); + } + } + } + + // Allow the connectors some time to start + synchronized (semaphore) { + semaphore.wait(1000); + } + // System.out.println("JMX Server Port:"+service.getManagementContext().getRmiServerPort()); + // setConnectorPort(startPort); - if (maxBrokerMemory > 0) { - System.out.println("Configuring Internal Broker With Max Memory Of:" + maxBrokerMemory); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), - "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_broker_memory__CONFIG", new Object[] { maxBrokerMemory }); - } - } - String[] connectors = service.getNetworkConnectorURIs(); - if (connectors != null) { - for (int i = 0; i < connectors.length; i++) { - System.out.println("ActiveMQ Broker Started With Connector:" + connectors[i]); - } - brokerURI = service.getMasterConnectorURI(); - } else { - - String connectorList = ""; - service.setPersistent(false); - int startPort = BASE_JMX_PORT; - if (System.getProperties().containsKey("com.sun.management.jmxremote.port")) { - startPort = Integer.parseInt(System.getProperty("com.sun.management.jmxremote.port")); - - } - while (startPort < MAX_PORT_THRESHOLD && !openPort(startPort)) { - startPort++; - } - if (startPort < (startPort + MAX_PORT_THRESHOLD)) { - MBeanServer jmxServer = null; - // Check if the MBeanServer is available. If it is, plug it into the - // local Broker. We only need one MBeanServer in the JVM - if ((jmxServer = ManagementContext.findTigerMBeanServer()) != null) { - System.out.println(">>> Found TigerMBeanServer Running. Attaching Broker to Tiger."); - service.getManagementContext().setMBeanServer(jmxServer); - // Specify JMX Port - service.getManagementContext().setConnectorPort(startPort); - } else { - service.getManagementContext().setConnectorPort(startPort); - service.setUseJmx(true); - } - - System.setProperty("com.sun.management.jmxremote.port", String.valueOf(startPort)); - - System.out.println("JMX Console connect URI: service:jmx:rmi:///jndi/rmi://localhost:" - + startPort + "/jmxrmi"); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { - UIMAFramework.getLogger(CLASS_NAME) - .logrb( - Level.CONFIG, - CLASS_NAME.getName(), - "startInternalBroker", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_jmx_uri__CONFIG", - new Object[] { "service:jmx:rmi:///jndi/rmi://localhost:" + startPort - + "/jmxrmi" }); - } - } - - brokerURI = generateInternalURI("tcp", 18810, true, false); - - // Wait until sucessfull adding connector to the broker - // Sleeps for 1sec between retries until success - int timeBetweenRetries = 1000; - boolean tcpConnectorAcquiredValidPort = false; - while (!tcpConnectorAcquiredValidPort) { - try { - tcpConnector = service.addConnector(brokerURI); - tcpConnectorAcquiredValidPort = true; - } catch (Exception e) { - synchronized (this) { - wait(timeBetweenRetries); - } - } // silence InstanceAlreadyExistsException - - } - System.out.println("Adding TCP Connector:" + tcpConnector.getConnectUri()); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), - "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_adding_connector__CONFIG", - new Object[] { "Adding TCP Connector", tcpConnector.getConnectUri() }); - } - connectorList = tcpConnector.getName(); - if (System.getProperty("StompSupport") != null) { - String stompURI = generateInternalURI("stomp", 61613, false, false); - connector = service.addConnector(stompURI); - System.out.println("Adding STOMP Connector:" + connector.getConnectUri()); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), - "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_adding_connector__CONFIG", - new Object[] { "Adding STOMP Connector", connector.getConnectUri() }); - } - connectorList += "," + connector.getName(); - } - if (System.getProperty("HTTP") != null) { - String stringPort = System.getProperty("HTTP"); - int p = Integer.parseInt(stringPort); - String httpURI = generateInternalURI("http", p, false, true); - httpConnector = service.addConnector(httpURI); - System.out.println("Adding HTTP Connector:" + httpConnector.getConnectUri()); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), - "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_adding_connector__CONFIG", - new Object[] { "Adding HTTP Connector", httpConnector.getConnectUri() }); - } - connectorList += "," + httpConnector.getName(); - } - service.start(); - System.setProperty("ActiveMQConnectors", connectorList); - System.out.println("Broker Service Started - URL:" + service.getVmConnectorURI()); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), - "startInternalBroker", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_broker_started__CONFIG", new Object[] { service.getVmConnectorURI() }); - } - } - - // Allow the connectors some time to start - synchronized (semaphore) { - semaphore.wait(1000); - } - // System.out.println("JMX Server Port:"+service.getManagementContext().getRmiServerPort()); - // setConnectorPort(startPort); - - } + } private boolean openPort(int aPort) { ServerSocket ssocket = null; @@ -285,50 +331,59 @@ public class BrokerDeployer implements A * objects from JMX Server. * */ - public void stop() { - Object monitor = new Object(); - if (service != null) { - try { - // if ( usageListener != null ) - // { - // service.getMemoryManager().removeUsageListener(usageListener); - // } - if (tcpConnector != null) { - tcpConnector.stop(); - System.out - .println("Broker Connector:" + tcpConnector.getUri().toString() + " is stopped"); - } - if (httpConnector != null) { - System.out.println("Broker Stopping HTTP Connector:" + httpConnector.getUri().toString()); - httpConnector.stop(); - System.out.println("Broker Connector:" + httpConnector.getUri().toString() - + " is stopped"); - } - service.getManagementContext().stop(); - service.stop(); - Broker broker = service.getBroker(); - while (!broker.isStopped()) { - synchronized (monitor) { - try { - monitor.wait(20); // wait for the broker to terminate - } catch (Exception e) { - } - } + public void stop() { + Object monitor = new Object(); + synchronized (brokerInstanceMux) { + if (service != null) { + try { + // if ( usageListener != null ) + // { + // service.getMemoryManager().removeUsageListener(usageListener); + // } + if (tcpConnector != null) { + tcpConnector.stop(); + System.out.println("Broker Connector:" + + tcpConnector.getUri().toString() + + " is stopped"); + } + if (httpConnector != null) { + System.out.println("Broker Stopping HTTP Connector:" + + httpConnector.getUri().toString()); + httpConnector.stop(); + System.out.println("Broker Connector:" + + httpConnector.getUri().toString() + + " is stopped"); + } + service.getManagementContext().stop(); + service.stop(); + Broker broker = service.getBroker(); + while (!broker.isStopped()) { + synchronized (monitor) { + try { + monitor.wait(20); // wait for the broker to + // terminate + } catch (Exception e) { + } + } + + } + System.out.println("Broker is stopped"); + broker = null; + service = null; + } catch (Exception e) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable( + Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb( + Level.WARNING, CLASS_NAME.getName(), "stop", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_exception__WARNING", e); + } - } - System.out.println("Broker is stopped"); - broker = null; - service = null; - } catch (Exception e) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), - "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_exception__WARNING", e); - } + } + } - } - } - } + } + } /** * Callback method invoked by Spring container during its lifecycle changes Ignore all events