Repository: activemq Updated Branches: refs/heads/master cc9b9b084 -> 00d19e7b6
http://git-wip-us.apache.org/repos/asf/activemq/blob/00d19e7b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig deleted file mode 100644 index e27279c..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig +++ /dev/null @@ -1,3147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.security.Provider; -import java.security.Security; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionMetaData; -import org.apache.activemq.ConfigurationException; -import org.apache.activemq.Service; -import org.apache.activemq.advisory.AdvisoryBroker; -import org.apache.activemq.broker.cluster.ConnectionSplitBroker; -import org.apache.activemq.broker.jmx.AnnotatedMBean; -import org.apache.activemq.broker.jmx.BrokerMBeanSupport; -import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.activemq.broker.jmx.ConnectorView; -import org.apache.activemq.broker.jmx.ConnectorViewMBean; -import org.apache.activemq.broker.jmx.HealthView; -import org.apache.activemq.broker.jmx.HealthViewMBean; -import org.apache.activemq.broker.jmx.JmsConnectorView; -import org.apache.activemq.broker.jmx.JobSchedulerView; -import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; -import org.apache.activemq.broker.jmx.Log4JConfigView; -import org.apache.activemq.broker.jmx.ManagedRegionBroker; -import org.apache.activemq.broker.jmx.ManagementContext; -import org.apache.activemq.broker.jmx.NetworkConnectorView; -import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; -import org.apache.activemq.broker.jmx.ProxyConnectorView; -import org.apache.activemq.broker.region.CompositeDestinationInterceptor; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.DestinationFactory; -import org.apache.activemq.broker.region.DestinationFactoryImpl; -import org.apache.activemq.broker.region.DestinationInterceptor; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.virtual.MirroredQueue; -import org.apache.activemq.broker.region.virtual.VirtualDestination; -import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; -import org.apache.activemq.broker.region.virtual.VirtualTopic; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; -import org.apache.activemq.broker.scheduler.SchedulerBroker; -import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.BrokerId; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.network.ConnectionFilter; -import org.apache.activemq.network.DiscoveryNetworkConnector; -import org.apache.activemq.network.NetworkConnector; -import org.apache.activemq.network.jms.JmsConnector; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.proxy.ProxyConnector; -import org.apache.activemq.security.MessageAuthorizationPolicy; -import org.apache.activemq.selector.SelectorParser; -import org.apache.activemq.store.JournaledStore; -import org.apache.activemq.store.PListStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.PersistenceAdapterFactory; -import org.apache.activemq.store.memory.MemoryPersistenceAdapter; -import org.apache.activemq.thread.Scheduler; -import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.transport.TransportFactorySupport; -import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.transport.vm.VMTransportFactory; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.util.BrokerSupport; -import org.apache.activemq.util.DefaultIOExceptionHandler; -import org.apache.activemq.util.IOExceptionHandler; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.InetAddressUtil; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.util.StoreUtil; -import org.apache.activemq.util.ThreadPoolUtils; -import org.apache.activemq.util.TimeUtils; -import org.apache.activemq.util.URISupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; - -/** - * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a - * number of transport connectors, network connectors and a bunch of properties - * which can be used to configure the broker as its lazily created. - * - * @org.apache.xbean.XBean - */ -public class BrokerService implements Service { - public static final String DEFAULT_PORT = "61616"; - public static final String LOCAL_HOST_NAME; - public static final String BROKER_VERSION; - public static final String DEFAULT_BROKER_NAME = "localhost"; - public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; - public static final long DEFAULT_START_TIMEOUT = 600000L; - - private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); - - @SuppressWarnings("unused") - private static final long serialVersionUID = 7353129142305630237L; - - private boolean useJmx = true; - private boolean enableStatistics = true; - private boolean persistent = true; - private boolean populateJMSXUserID; - private boolean useAuthenticatedPrincipalForJMSXUserID; - private boolean populateUserNameInMBeans; - private long mbeanInvocationTimeout = 0; - - private boolean useShutdownHook = true; - private boolean useLoggingForShutdownErrors; - private boolean shutdownOnMasterFailure; - private boolean shutdownOnSlaveFailure; - private boolean waitForSlave; - private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT; - private boolean passiveSlave; - private String brokerName = DEFAULT_BROKER_NAME; - private File dataDirectoryFile; - private File tmpDataDirectory; - private Broker broker; - private BrokerView adminView; - private ManagementContext managementContext; - private ObjectName brokerObjectName; - private TaskRunnerFactory taskRunnerFactory; - private TaskRunnerFactory persistenceTaskRunnerFactory; - private SystemUsage systemUsage; - private SystemUsage producerSystemUsage; - private SystemUsage consumerSystemUsaage; - private PersistenceAdapter persistenceAdapter; - private PersistenceAdapterFactory persistenceFactory; - protected DestinationFactory destinationFactory; - private MessageAuthorizationPolicy messageAuthorizationPolicy; - private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>(); - private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); - private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>(); - private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>(); - private final List<Service> services = new ArrayList<Service>(); - private transient Thread shutdownHook; - private String[] transportConnectorURIs; - private String[] networkConnectorURIs; - private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges - // to other jms messaging systems - private boolean deleteAllMessagesOnStartup; - private boolean advisorySupport = true; - private URI vmConnectorURI; - private String defaultSocketURIString; - private PolicyMap destinationPolicy; - private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean stopped = new AtomicBoolean(false); - private final AtomicBoolean stopping = new AtomicBoolean(false); - private BrokerPlugin[] plugins; - private boolean keepDurableSubsActive = true; - private boolean useVirtualTopics = true; - private boolean useMirroredQueues = false; - private boolean useTempMirroredQueues = true; - private BrokerId brokerId; - private volatile DestinationInterceptor[] destinationInterceptors; - private ActiveMQDestination[] destinations; - private PListStore tempDataStore; - private int persistenceThreadPriority = Thread.MAX_PRIORITY; - private boolean useLocalHostBrokerName; - private final CountDownLatch stoppedLatch = new CountDownLatch(1); - private final CountDownLatch startedLatch = new CountDownLatch(1); - private Broker regionBroker; - private int producerSystemUsagePortion = 60; - private int consumerSystemUsagePortion = 40; - private boolean splitSystemUsageForProducersConsumers; - private boolean monitorConnectionSplits = false; - private int taskRunnerPriority = Thread.NORM_PRIORITY; - private boolean dedicatedTaskRunner; - private boolean cacheTempDestinations = false;// useful for failover - private int timeBeforePurgeTempDestinations = 5000; - private final List<Runnable> shutdownHooks = new ArrayList<Runnable>(); - private boolean systemExitOnShutdown; - private int systemExitOnShutdownExitCode; - private SslContext sslContext; - private boolean forceStart = false; - private IOExceptionHandler ioExceptionHandler; - private boolean schedulerSupport = false; - private File schedulerDirectoryFile; - private Scheduler scheduler; - private ThreadPoolExecutor executor; - private int schedulePeriodForDestinationPurge= 0; - private int maxPurgedDestinationsPerSweep = 0; - private int schedulePeriodForDiskUsageCheck = 0; - private boolean diskUsageCheckRegrowPercentChange = true; - private BrokerContext brokerContext; - private boolean networkConnectorStartAsync = false; - private boolean allowTempAutoCreationOnSend; - private JobSchedulerStore jobSchedulerStore; - private final AtomicLong totalConnections = new AtomicLong(); - private final AtomicInteger currentConnections = new AtomicInteger(); - - private long offlineDurableSubscriberTimeout = -1; - private long offlineDurableSubscriberTaskSchedule = 300000; - private DestinationFilter virtualConsumerDestinationFilter; - - private final AtomicBoolean persistenceAdapterStarted = new AtomicBoolean(false); - private Throwable startException = null; - private boolean startAsync = false; - private Date startDate; - private boolean slave = true; - - private boolean restartAllowed = true; - private boolean restartRequested = false; - private boolean rejectDurableConsumers = false; - - private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; - - static { - - try { - ClassLoader loader = BrokerService.class.getClassLoader(); - Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider"); - Provider bouncycastle = (Provider) clazz.newInstance(); - Security.insertProviderAt(bouncycastle, 2); - LOG.info("Loaded the Bouncy Castle security provider."); - } catch(Throwable e) { - // No BouncyCastle found so we use the default Java Security Provider - } - - String localHostName = "localhost"; - try { - localHostName = InetAddressUtil.getLocalHostName(); - } catch (UnknownHostException e) { - LOG.error("Failed to resolve localhost"); - } - LOCAL_HOST_NAME = localHostName; - - String version = null; - try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) { - if (in != null) { - try(InputStreamReader isr = new InputStreamReader(in); - BufferedReader reader = new BufferedReader(isr)) { - version = reader.readLine(); - } - } - } catch (IOException ie) { - LOG.warn("Error reading broker version ", ie); - } - BROKER_VERSION = version; - } - - @Override - public String toString() { - return "BrokerService[" + getBrokerName() + "]"; - } - - private String getBrokerVersion() { - String version = ActiveMQConnectionMetaData.PROVIDER_VERSION; - if (version == null) { - version = BROKER_VERSION; - } - - return version; - } - - /** - * Adds a new transport connector for the given bind address - * - * @return the newly created and added transport connector - * @throws Exception - */ - public TransportConnector addConnector(String bindAddress) throws Exception { - return addConnector(new URI(bindAddress)); - } - - /** - * Adds a new transport connector for the given bind address - * - * @return the newly created and added transport connector - * @throws Exception - */ - public TransportConnector addConnector(URI bindAddress) throws Exception { - return addConnector(createTransportConnector(bindAddress)); - } - - /** - * Adds a new transport connector for the given TransportServer transport - * - * @return the newly created and added transport connector - * @throws Exception - */ - public TransportConnector addConnector(TransportServer transport) throws Exception { - return addConnector(new TransportConnector(transport)); - } - - /** - * Adds a new transport connector - * - * @return the transport connector - * @throws Exception - */ - public TransportConnector addConnector(TransportConnector connector) throws Exception { - transportConnectors.add(connector); - return connector; - } - - /** - * Stops and removes a transport connector from the broker. - * - * @param connector - * @return true if the connector has been previously added to the broker - * @throws Exception - */ - public boolean removeConnector(TransportConnector connector) throws Exception { - boolean rc = transportConnectors.remove(connector); - if (rc) { - unregisterConnectorMBean(connector); - } - return rc; - } - - /** - * Adds a new network connector using the given discovery address - * - * @return the newly created and added network connector - * @throws Exception - */ - public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { - return addNetworkConnector(new URI(discoveryAddress)); - } - - /** - * Adds a new proxy connector using the given bind address - * - * @return the newly created and added network connector - * @throws Exception - */ - public ProxyConnector addProxyConnector(String bindAddress) throws Exception { - return addProxyConnector(new URI(bindAddress)); - } - - /** - * Adds a new network connector using the given discovery address - * - * @return the newly created and added network connector - * @throws Exception - */ - public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { - NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); - return addNetworkConnector(connector); - } - - /** - * Adds a new proxy connector using the given bind address - * - * @return the newly created and added network connector - * @throws Exception - */ - public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { - ProxyConnector connector = new ProxyConnector(); - connector.setBind(bindAddress); - connector.setRemote(new URI("fanout:multicast://default")); - return addProxyConnector(connector); - } - - /** - * Adds a new network connector to connect this broker to a federated - * network - */ - public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { - connector.setBrokerService(this); - connector.setLocalUri(getVmConnectorURI()); - // Set a connection filter so that the connector does not establish loop - // back connections. - connector.setConnectionFilter(new ConnectionFilter() { - @Override - public boolean connectTo(URI location) { - List<TransportConnector> transportConnectors = getTransportConnectors(); - for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { - try { - TransportConnector tc = iter.next(); - if (location.equals(tc.getConnectUri())) { - return false; - } - } catch (Throwable e) { - } - } - return true; - } - }); - networkConnectors.add(connector); - return connector; - } - - /** - * Removes the given network connector without stopping it. The caller - * should call {@link NetworkConnector#stop()} to close the connector - */ - public boolean removeNetworkConnector(NetworkConnector connector) { - boolean answer = networkConnectors.remove(connector); - if (answer) { - unregisterNetworkConnectorMBean(connector); - } - return answer; - } - - public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { - URI uri = getVmConnectorURI(); - connector.setLocalUri(uri); - proxyConnectors.add(connector); - if (isUseJmx()) { - registerProxyConnectorMBean(connector); - } - return connector; - } - - public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { - connector.setBrokerService(this); - jmsConnectors.add(connector); - if (isUseJmx()) { - registerJmsConnectorMBean(connector); - } - return connector; - } - - public JmsConnector removeJmsConnector(JmsConnector connector) { - if (jmsConnectors.remove(connector)) { - return connector; - } - return null; - } - - public void masterFailed() { - if (shutdownOnMasterFailure) { - LOG.error("The Master has failed ... shutting down"); - try { - stop(); - } catch (Exception e) { - LOG.error("Failed to stop for master failure", e); - } - } else { - LOG.warn("Master Failed - starting all connectors"); - try { - startAllConnectors(); - broker.nowMasterBroker(); - } catch (Exception e) { - LOG.error("Failed to startAllConnectors", e); - } - } - } - - public String getUptime() { - long delta = getUptimeMillis(); - - if (delta == 0) { - return "not started"; - } - - return TimeUtils.printDuration(delta); - } - - public long getUptimeMillis() { - if (startDate == null) { - return 0; - } - - return new Date().getTime() - startDate.getTime(); - } - - public boolean isStarted() { - return started.get() && startedLatch.getCount() == 0; - } - - /** - * Forces a start of the broker. - * By default a BrokerService instance that was - * previously stopped using BrokerService.stop() cannot be restarted - * using BrokerService.start(). - * This method enforces a restart. - * It is not recommended to force a restart of the broker and will not work - * for most but some very trivial broker configurations. - * For restarting a broker instance we recommend to first call stop() on - * the old instance and then recreate a new BrokerService instance. - * - * @param force - if true enforces a restart. - * @throws Exception - */ - public void start(boolean force) throws Exception { - forceStart = force; - stopped.set(false); - started.set(false); - start(); - } - - // Service interface - // ------------------------------------------------------------------------- - - protected boolean shouldAutostart() { - return true; - } - - /** - * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions - * - * delegates to autoStart, done to prevent backwards incompatible signature change - */ - @PostConstruct - private void postConstruct() { - try { - autoStart(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - /** - * - * @throws Exception - * @org. apache.xbean.InitMethod - */ - public void autoStart() throws Exception { - if(shouldAutostart()) { - start(); - } - } - - @Override - public void start() throws Exception { - if (stopped.get() || !started.compareAndSet(false, true)) { - // lets just ignore redundant start() calls - // as its way too easy to not be completely sure if start() has been - // called or not with the gazillion of different configuration - // mechanisms - // throw new IllegalStateException("Already started."); - return; - } - - stopping.set(false); - startDate = new Date(); - MDC.put("activemq.broker", brokerName); - - try { - if (systemExitOnShutdown && useShutdownHook) { - throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); - } - processHelperProperties(); - if (isUseJmx()) { - // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and - // we cannot cleanup clear that during shutdown of the broker. - MDC.remove("activemq.broker"); - try { - startManagementContext(); - for (NetworkConnector connector : getNetworkConnectors()) { - registerNetworkConnectorMBean(connector); - } - } finally { - MDC.put("activemq.broker", brokerName); - } - } - - // in jvm master slave, lets not publish over existing broker till we get the lock - final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance(); - if (brokerRegistry.lookup(getBrokerName()) == null) { - brokerRegistry.bind(getBrokerName(), BrokerService.this); - } - startPersistenceAdapter(startAsync); - startBroker(startAsync); - brokerRegistry.bind(getBrokerName(), BrokerService.this); - } catch (Exception e) { - LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e); - try { - if (!stopped.get()) { - stop(); - } - } catch (Exception ex) { - LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex); - } - throw e; - } finally { - MDC.remove("activemq.broker"); - } - } - - private void startPersistenceAdapter(boolean async) throws Exception { - if (async) { - new Thread("Persistence Adapter Starting Thread") { - @Override - public void run() { - try { - doStartPersistenceAdapter(); - } catch (Throwable e) { - startException = e; - } finally { - synchronized (persistenceAdapterStarted) { - persistenceAdapterStarted.set(true); - persistenceAdapterStarted.notifyAll(); - } - } - } - }.start(); - } else { - doStartPersistenceAdapter(); - } - } - - private void doStartPersistenceAdapter() throws Exception { - getPersistenceAdapter().setUsageManager(getProducerSystemUsage()); - getPersistenceAdapter().setBrokerName(getBrokerName()); - LOG.info("Using Persistence Adapter: {}", getPersistenceAdapter()); - if (deleteAllMessagesOnStartup) { - deleteAllMessages(); - } - getPersistenceAdapter().start(); - - getJobSchedulerStore(); - if (jobSchedulerStore != null) { - try { - jobSchedulerStore.start(); - } catch (Exception e) { - RuntimeException exception = new RuntimeException( - "Failed to start job scheduler store: " + jobSchedulerStore, e); - LOG.error(exception.getLocalizedMessage(), e); - throw exception; - } - } - } - - private void startBroker(boolean async) throws Exception { - if (async) { - new Thread("Broker Starting Thread") { - @Override - public void run() { - try { - synchronized (persistenceAdapterStarted) { - if (!persistenceAdapterStarted.get()) { - persistenceAdapterStarted.wait(); - } - } - doStartBroker(); - } catch (Throwable t) { - startException = t; - } - } - }.start(); - } else { - doStartBroker(); - } - } - - private void doStartBroker() throws Exception { - if (startException != null) { - return; - } - startDestinations(); - addShutdownHook(); - - broker = getBroker(); - brokerId = broker.getBrokerId(); - - // need to log this after creating the broker so we have its id and name - LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId }); - broker.start(); - - if (isUseJmx()) { - if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { - // try to restart management context - // typical for slaves that use the same ports as master - managementContext.stop(); - startManagementContext(); - } - ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; - managedBroker.setContextBroker(broker); - adminView.setBroker(managedBroker); - } - - if (ioExceptionHandler == null) { - setIoExceptionHandler(new DefaultIOExceptionHandler()); - } - - if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) { - ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString()); - Log4JConfigView log4jConfigView = new Log4JConfigView(); - AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName); - } - - startAllConnectors(); - - LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); - LOG.info("For help or more information please see: http://activemq.apache.org"); - - getBroker().brokerServiceStarted(); - checkSystemUsageLimits(); - startedLatch.countDown(); - getBroker().nowMasterBroker(); - } - - /** - * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions - * - * delegates to stop, done to prevent backwards incompatible signature change - */ - @PreDestroy - private void preDestroy () { - try { - stop(); - } catch (Exception ex) { - throw new RuntimeException(); - } - } - - /** - * - * @throws Exception - * @org.apache .xbean.DestroyMethod - */ - @Override - public void stop() throws Exception { - if (!stopping.compareAndSet(false, true)) { - LOG.trace("Broker already stopping/stopped"); - return; - } - - MDC.put("activemq.broker", brokerName); - - if (systemExitOnShutdown) { - new Thread() { - @Override - public void run() { - System.exit(systemExitOnShutdownExitCode); - } - }.start(); - } - - LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} ); - - removeShutdownHook(); - if (this.scheduler != null) { - this.scheduler.stop(); - this.scheduler = null; - } - ServiceStopper stopper = new ServiceStopper(); - if (services != null) { - for (Service service : services) { - stopper.stop(service); - } - } - stopAllConnectors(stopper); - this.slave = true; - // remove any VMTransports connected - // this has to be done after services are stopped, - // to avoid timing issue with discovery (spinning up a new instance) - BrokerRegistry.getInstance().unbind(getBrokerName()); - VMTransportFactory.stopped(getBrokerName()); - if (broker != null) { - stopper.stop(broker); - broker = null; - } - - if (jobSchedulerStore != null) { - jobSchedulerStore.stop(); - jobSchedulerStore = null; - } - if (tempDataStore != null) { - tempDataStore.stop(); - tempDataStore = null; - } - try { - stopper.stop(persistenceAdapter); - persistenceAdapter = null; - if (isUseJmx()) { - stopper.stop(getManagementContext()); - managementContext = null; - } - // Clear SelectorParser cache to free memory - SelectorParser.clearCache(); - } finally { - started.set(false); - stopped.set(true); - stoppedLatch.countDown(); - } - - if (this.taskRunnerFactory != null) { - this.taskRunnerFactory.shutdown(); - this.taskRunnerFactory = null; - } - if (this.executor != null) { - ThreadPoolUtils.shutdownNow(executor); - this.executor = null; - } - - this.destinationInterceptors = null; - this.destinationFactory = null; - - if (startDate != null) { - LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()}); - } - LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); - - synchronized (shutdownHooks) { - for (Runnable hook : shutdownHooks) { - try { - hook.run(); - } catch (Throwable e) { - stopper.onException(hook, e); - } - } - } - - MDC.remove("activemq.broker"); - - // and clear start date - startDate = null; - - stopper.throwFirstException(); - } - - public boolean checkQueueSize(String queueName) { - long count = 0; - long queueSize = 0; - Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); - for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { - if (entry.getKey().isQueue()) { - if (entry.getValue().getName().matches(queueName)) { - queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); - count += queueSize; - if (queueSize > 0) { - LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize); - } - } - } - } - return count == 0; - } - - /** - * This method (both connectorName and queueName are using regex to match) - * 1. stop the connector (supposed the user input the connector which the - * clients connect to) 2. to check whether there is any pending message on - * the queues defined by queueName 3. supposedly, after stop the connector, - * client should failover to other broker and pending messages should be - * forwarded. if no pending messages, the method finally call stop to stop - * the broker. - * - * @param connectorName - * @param queueName - * @param timeout - * @param pollInterval - * @throws Exception - */ - public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { - if (isUseJmx()) { - if (connectorName == null || queueName == null || timeout <= 0) { - throw new Exception( - "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); - } - if (pollInterval <= 0) { - pollInterval = 30; - } - LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{ - connectorName, queueName, timeout, pollInterval - }); - TransportConnector connector; - for (int i = 0; i < transportConnectors.size(); i++) { - connector = transportConnectors.get(i); - if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { - connector.stop(); - } - } - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < timeout * 1000) { - // check quesize until it gets zero - if (checkQueueSize(queueName)) { - stop(); - break; - } else { - Thread.sleep(pollInterval * 1000); - } - } - if (stopped.get()) { - LOG.info("Successfully stop the broker."); - } else { - LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); - } - } - } - - /** - * A helper method to block the caller thread until the broker has been - * stopped - */ - public void waitUntilStopped() { - while (isStarted() && !stopped.get()) { - try { - stoppedLatch.await(); - } catch (InterruptedException e) { - // ignore - } - } - } - - public boolean isStopped() { - return stopped.get(); - } - - /** - * A helper method to block the caller thread until the broker has fully started - * @return boolean true if wait succeeded false if broker was not started or was stopped - */ - public boolean waitUntilStarted() { - return waitUntilStarted(DEFAULT_START_TIMEOUT); - } - - /** - * A helper method to block the caller thread until the broker has fully started - * - * @param timeout - * the amount of time to wait before giving up and returning false. - * - * @return boolean true if wait succeeded false if broker was not started or was stopped - */ - public boolean waitUntilStarted(long timeout) { - boolean waitSucceeded = isStarted(); - long expiration = Math.max(0, timeout + System.currentTimeMillis()); - while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { - try { - if (startException != null) { - return waitSucceeded; - } - waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignore) { - } - } - return waitSucceeded; - } - - // Properties - // ------------------------------------------------------------------------- - /** - * Returns the message broker - */ - public Broker getBroker() throws Exception { - if (broker == null) { - broker = createBroker(); - } - return broker; - } - - /** - * Returns the administration view of the broker; used to create and destroy - * resources such as queues and topics. Note this method returns null if JMX - * is disabled. - */ - public BrokerView getAdminView() throws Exception { - if (adminView == null) { - // force lazy creation - getBroker(); - } - return adminView; - } - - public void setAdminView(BrokerView adminView) { - this.adminView = adminView; - } - - public String getBrokerName() { - return brokerName; - } - - /** - * Sets the name of this broker; which must be unique in the network - * - * @param brokerName - */ - public void setBrokerName(String brokerName) { - if (brokerName == null) { - throw new NullPointerException("The broker name cannot be null"); - } - String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_"); - if (!str.equals(brokerName)) { - LOG.error("Broker Name: {} contained illegal characters - replaced with {}", brokerName, str); - } - this.brokerName = str.trim(); - } - - public PersistenceAdapterFactory getPersistenceFactory() { - return persistenceFactory; - } - - public File getDataDirectoryFile() { - if (dataDirectoryFile == null) { - dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); - } - return dataDirectoryFile; - } - - public File getBrokerDataDirectory() { - String brokerDir = getBrokerName(); - return new File(getDataDirectoryFile(), brokerDir); - } - - /** - * Sets the directory in which the data files will be stored by default for - * the JDBC and Journal persistence adaptors. - * - * @param dataDirectory - * the directory to store data files - */ - public void setDataDirectory(String dataDirectory) { - setDataDirectoryFile(new File(dataDirectory)); - } - - /** - * Sets the directory in which the data files will be stored by default for - * the JDBC and Journal persistence adaptors. - * - * @param dataDirectoryFile - * the directory to store data files - */ - public void setDataDirectoryFile(File dataDirectoryFile) { - this.dataDirectoryFile = dataDirectoryFile; - } - - /** - * @return the tmpDataDirectory - */ - public File getTmpDataDirectory() { - if (tmpDataDirectory == null) { - tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); - } - return tmpDataDirectory; - } - - /** - * @param tmpDataDirectory - * the tmpDataDirectory to set - */ - public void setTmpDataDirectory(File tmpDataDirectory) { - this.tmpDataDirectory = tmpDataDirectory; - } - - public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { - this.persistenceFactory = persistenceFactory; - } - - public void setDestinationFactory(DestinationFactory destinationFactory) { - this.destinationFactory = destinationFactory; - } - - public boolean isPersistent() { - return persistent; - } - - /** - * Sets whether or not persistence is enabled or disabled. - * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" - */ - public void setPersistent(boolean persistent) { - this.persistent = persistent; - } - - public boolean isPopulateJMSXUserID() { - return populateJMSXUserID; - } - - /** - * Sets whether or not the broker should populate the JMSXUserID header. - */ - public void setPopulateJMSXUserID(boolean populateJMSXUserID) { - this.populateJMSXUserID = populateJMSXUserID; - } - - public SystemUsage getSystemUsage() { - try { - if (systemUsage == null) { - - systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore()); - systemUsage.setExecutor(getExecutor()); - systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB - systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB - systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB - systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB - addService(this.systemUsage); - } - return systemUsage; - } catch (IOException e) { - LOG.error("Cannot create SystemUsage", e); - throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e); - } - } - - public void setSystemUsage(SystemUsage memoryManager) { - if (this.systemUsage != null) { - removeService(this.systemUsage); - } - this.systemUsage = memoryManager; - if (this.systemUsage.getExecutor()==null) { - this.systemUsage.setExecutor(getExecutor()); - } - addService(this.systemUsage); - } - - /** - * @return the consumerUsageManager - * @throws IOException - */ - public SystemUsage getConsumerSystemUsage() throws IOException { - if (this.consumerSystemUsaage == null) { - if (splitSystemUsageForProducersConsumers) { - this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); - float portion = consumerSystemUsagePortion / 100f; - this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); - addService(this.consumerSystemUsaage); - } else { - consumerSystemUsaage = getSystemUsage(); - } - } - return this.consumerSystemUsaage; - } - - /** - * @param consumerSystemUsaage - * the storeSystemUsage to set - */ - public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { - if (this.consumerSystemUsaage != null) { - removeService(this.consumerSystemUsaage); - } - this.consumerSystemUsaage = consumerSystemUsaage; - addService(this.consumerSystemUsaage); - } - - /** - * @return the producerUsageManager - * @throws IOException - */ - public SystemUsage getProducerSystemUsage() throws IOException { - if (producerSystemUsage == null) { - if (splitSystemUsageForProducersConsumers) { - producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); - float portion = producerSystemUsagePortion / 100f; - producerSystemUsage.getMemoryUsage().setUsagePortion(portion); - addService(producerSystemUsage); - } else { - producerSystemUsage = getSystemUsage(); - } - } - return producerSystemUsage; - } - - /** - * @param producerUsageManager - * the producerUsageManager to set - */ - public void setProducerSystemUsage(SystemUsage producerUsageManager) { - if (this.producerSystemUsage != null) { - removeService(this.producerSystemUsage); - } - this.producerSystemUsage = producerUsageManager; - addService(this.producerSystemUsage); - } - - public PersistenceAdapter getPersistenceAdapter() throws IOException { - if (persistenceAdapter == null) { - persistenceAdapter = createPersistenceAdapter(); - configureService(persistenceAdapter); - this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); - } - return persistenceAdapter; - } - - /** - * Sets the persistence adaptor implementation to use for this broker - * - * @throws IOException - */ - public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { - if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) { - LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter); - return; - } - this.persistenceAdapter = persistenceAdapter; - configureService(this.persistenceAdapter); - this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); - } - - public TaskRunnerFactory getTaskRunnerFactory() { - if (this.taskRunnerFactory == null) { - this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, - isDedicatedTaskRunner()); - this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader()); - } - return this.taskRunnerFactory; - } - - public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { - this.taskRunnerFactory = taskRunnerFactory; - } - - public TaskRunnerFactory getPersistenceTaskRunnerFactory() { - if (taskRunnerFactory == null) { - persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, - true, 1000, isDedicatedTaskRunner()); - } - return persistenceTaskRunnerFactory; - } - - public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { - this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; - } - - public boolean isUseJmx() { - return useJmx; - } - - public boolean isEnableStatistics() { - return enableStatistics; - } - - /** - * Sets whether or not the Broker's services enable statistics or not. - */ - public void setEnableStatistics(boolean enableStatistics) { - this.enableStatistics = enableStatistics; - } - - /** - * Sets whether or not the Broker's services should be exposed into JMX or - * not. - * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" - */ - public void setUseJmx(boolean useJmx) { - this.useJmx = useJmx; - } - - public ObjectName getBrokerObjectName() throws MalformedObjectNameException { - if (brokerObjectName == null) { - brokerObjectName = createBrokerObjectName(); - } - return brokerObjectName; - } - - /** - * Sets the JMX ObjectName for this broker - */ - public void setBrokerObjectName(ObjectName brokerObjectName) { - this.brokerObjectName = brokerObjectName; - } - - public ManagementContext getManagementContext() { - if (managementContext == null) { - managementContext = new ManagementContext(); - } - return managementContext; - } - - public void setManagementContext(ManagementContext managementContext) { - this.managementContext = managementContext; - } - - public NetworkConnector getNetworkConnectorByName(String connectorName) { - for (NetworkConnector connector : networkConnectors) { - if (connector.getName().equals(connectorName)) { - return connector; - } - } - return null; - } - - public String[] getNetworkConnectorURIs() { - return networkConnectorURIs; - } - - public void setNetworkConnectorURIs(String[] networkConnectorURIs) { - this.networkConnectorURIs = networkConnectorURIs; - } - - public TransportConnector getConnectorByName(String connectorName) { - for (TransportConnector connector : transportConnectors) { - if (connector.getName().equals(connectorName)) { - return connector; - } - } - return null; - } - - public Map<String, String> getTransportConnectorURIsAsMap() { - Map<String, String> answer = new HashMap<String, String>(); - for (TransportConnector connector : transportConnectors) { - try { - URI uri = connector.getConnectUri(); - if (uri != null) { - String scheme = uri.getScheme(); - if (scheme != null) { - answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString()); - } - } - } catch (Exception e) { - LOG.debug("Failed to read URI to build transportURIsAsMap", e); - } - } - return answer; - } - - public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){ - ProducerBrokerExchange result = null; - - for (TransportConnector connector : transportConnectors) { - for (TransportConnection tc: connector.getConnections()){ - result = tc.getProducerBrokerExchangeIfExists(producerInfo); - if (result !=null){ - return result; - } - } - } - return result; - } - - public String[] getTransportConnectorURIs() { - return transportConnectorURIs; - } - - public void setTransportConnectorURIs(String[] transportConnectorURIs) { - this.transportConnectorURIs = transportConnectorURIs; - } - - /** - * @return Returns the jmsBridgeConnectors. - */ - public JmsConnector[] getJmsBridgeConnectors() { - return jmsBridgeConnectors; - } - - /** - * @param jmsConnectors - * The jmsBridgeConnectors to set. - */ - public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { - this.jmsBridgeConnectors = jmsConnectors; - } - - public Service[] getServices() { - return services.toArray(new Service[0]); - } - - /** - * Sets the services associated with this broker. - */ - public void setServices(Service[] services) { - this.services.clear(); - if (services != null) { - for (int i = 0; i < services.length; i++) { - this.services.add(services[i]); - } - } - } - - /** - * Adds a new service so that it will be started as part of the broker - * lifecycle - */ - public void addService(Service service) { - services.add(service); - } - - public void removeService(Service service) { - services.remove(service); - } - - public boolean isUseLoggingForShutdownErrors() { - return useLoggingForShutdownErrors; - } - - /** - * Sets whether or not we should use commons-logging when reporting errors - * when shutting down the broker - */ - public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { - this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; - } - - public boolean isUseShutdownHook() { - return useShutdownHook; - } - - /** - * Sets whether or not we should use a shutdown handler to close down the - * broker cleanly if the JVM is terminated. It is recommended you leave this - * enabled. - */ - public void setUseShutdownHook(boolean useShutdownHook) { - this.useShutdownHook = useShutdownHook; - } - - public boolean isAdvisorySupport() { - return advisorySupport; - } - - /** - * Allows the support of advisory messages to be disabled for performance - * reasons. - * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" - */ - public void setAdvisorySupport(boolean advisorySupport) { - this.advisorySupport = advisorySupport; - } - - public List<TransportConnector> getTransportConnectors() { - return new ArrayList<TransportConnector>(transportConnectors); - } - - /** - * Sets the transport connectors which this broker will listen on for new - * clients - * - * @org.apache.xbean.Property - * nestedType="org.apache.activemq.broker.TransportConnector" - */ - public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { - for (TransportConnector connector : transportConnectors) { - addConnector(connector); - } - } - - public TransportConnector getTransportConnectorByName(String name){ - for (TransportConnector transportConnector : transportConnectors){ - if (name.equals(transportConnector.getName())){ - return transportConnector; - } - } - return null; - } - - public TransportConnector getTransportConnectorByScheme(String scheme){ - for (TransportConnector transportConnector : transportConnectors){ - if (scheme.equals(transportConnector.getUri().getScheme())){ - return transportConnector; - } - } - return null; - } - - public List<NetworkConnector> getNetworkConnectors() { - return new ArrayList<NetworkConnector>(networkConnectors); - } - - public List<ProxyConnector> getProxyConnectors() { - return new ArrayList<ProxyConnector>(proxyConnectors); - } - - /** - * Sets the network connectors which this broker will use to connect to - * other brokers in a federated network - * - * @org.apache.xbean.Property - * nestedType="org.apache.activemq.network.NetworkConnector" - */ - public void setNetworkConnectors(List<?> networkConnectors) throws Exception { - for (Object connector : networkConnectors) { - addNetworkConnector((NetworkConnector) connector); - } - } - - /** - * Sets the network connectors which this broker will use to connect to - * other brokers in a federated network - */ - public void setProxyConnectors(List<?> proxyConnectors) throws Exception { - for (Object connector : proxyConnectors) { - addProxyConnector((ProxyConnector) connector); - } - } - - public PolicyMap getDestinationPolicy() { - return destinationPolicy; - } - - /** - * Sets the destination specific policies available either for exact - * destinations or for wildcard areas of destinations. - */ - public void setDestinationPolicy(PolicyMap policyMap) { - this.destinationPolicy = policyMap; - } - - public BrokerPlugin[] getPlugins() { - return plugins; - } - - /** - * Sets a number of broker plugins to install such as for security - * authentication or authorization - */ - public void setPlugins(BrokerPlugin[] plugins) { - this.plugins = plugins; - } - - public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { - return messageAuthorizationPolicy; - } - - /** - * Sets the policy used to decide if the current connection is authorized to - * consume a given message - */ - public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { - this.messageAuthorizationPolicy = messageAuthorizationPolicy; - } - - /** - * Delete all messages from the persistent store - * - * @throws IOException - */ - public void deleteAllMessages() throws IOException { - getPersistenceAdapter().deleteAllMessages(); - } - - public boolean isDeleteAllMessagesOnStartup() { - return deleteAllMessagesOnStartup; - } - - /** - * Sets whether or not all messages are deleted on startup - mostly only - * useful for testing. - * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" - */ - public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { - this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; - } - - public URI getVmConnectorURI() { - if (vmConnectorURI == null) { - try { - vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_")); - } catch (URISyntaxException e) { - LOG.error("Badly formed URI from {}", getBrokerName(), e); - } - } - return vmConnectorURI; - } - - public void setVmConnectorURI(URI vmConnectorURI) { - this.vmConnectorURI = vmConnectorURI; - } - - public String getDefaultSocketURIString() { - if (started.get()) { - if (this.defaultSocketURIString == null) { - for (TransportConnector tc:this.transportConnectors) { - String result = null; - try { - result = tc.getPublishableConnectString(); - } catch (Exception e) { - LOG.warn("Failed to get the ConnectURI for {}", tc, e); - } - if (result != null) { - // find first publishable uri - if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { - this.defaultSocketURIString = result; - break; - } else { - // or use the first defined - if (this.defaultSocketURIString == null) { - this.defaultSocketURIString = result; - } - } - } - } - - } - return this.defaultSocketURIString; - } - return null; - } - - /** - * @return Returns the shutdownOnMasterFailure. - */ - public boolean isShutdownOnMasterFailure() { - return shutdownOnMasterFailure; - } - - /** - * @param shutdownOnMasterFailure - * The shutdownOnMasterFailure to set. - */ - public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { - this.shutdownOnMasterFailure = shutdownOnMasterFailure; - } - - public boolean isKeepDurableSubsActive() { - return keepDurableSubsActive; - } - - public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { - this.keepDurableSubsActive = keepDurableSubsActive; - } - - public boolean isUseVirtualTopics() { - return useVirtualTopics; - } - - /** - * Sets whether or not <a - * href="http://activemq.apache.org/virtual-destinations.html">Virtual - * Topics</a> should be supported by default if they have not been - * explicitly configured. - */ - public void setUseVirtualTopics(boolean useVirtualTopics) { - this.useVirtualTopics = useVirtualTopics; - } - - public DestinationInterceptor[] getDestinationInterceptors() { - return destinationInterceptors; - } - - public boolean isUseMirroredQueues() { - return useMirroredQueues; - } - - /** - * Sets whether or not <a - * href="http://activemq.apache.org/mirrored-queues.html">Mirrored - * Queues</a> should be supported by default if they have not been - * explicitly configured. - */ - public void setUseMirroredQueues(boolean useMirroredQueues) { - this.useMirroredQueues = useMirroredQueues; - } - - /** - * Sets the destination interceptors to use - */ - public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { - this.destinationInterceptors = destinationInterceptors; - } - - public ActiveMQDestination[] getDestinations() { - return destinations; - } - - /** - * Sets the destinations which should be loaded/created on startup - */ - public void setDestinations(ActiveMQDestination[] destinations) { - this.destinations = destinations; - } - - /** - * @return the tempDataStore - */ - public synchronized PListStore getTempDataStore() { - if (tempDataStore == null) { - if (!isPersistent()) { - return null; - } - - try { - PersistenceAdapter pa = getPersistenceAdapter(); - if( pa!=null && pa instanceof PListStore) { - return (PListStore) pa; - } - } catch (IOException e) { - throw new RuntimeException(e); - } - - boolean result = true; - boolean empty = true; - try { - File directory = getTmpDataDirectory(); - if (directory.exists() && directory.isDirectory()) { - File[] files = directory.listFiles(); - if (files != null && files.length > 0) { - empty = false; - for (int i = 0; i < files.length; i++) { - File file = files[i]; - if (!file.isDirectory()) { - result &= file.delete(); - } - } - } - } - if (!empty) { - String str = result ? "Successfully deleted" : "Failed to delete"; - LOG.info("{} temporary storage", str); - } - - String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl"; - this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance(); - this.tempDataStore.setDirectory(getTmpDataDirectory()); - configureService(tempDataStore); - this.tempDataStore.start(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return tempDataStore; - } - - /** - * @param tempDataStore - * the tempDataStore to set - */ - public void setTempDataStore(PListStore tempDataStore) { - this.tempDataStore = tempDataStore; - configureService(tempDataStore); - try { - tempDataStore.start(); - } catch (Exception e) { - RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e); - LOG.error(exception.getLocalizedMessage(), e); - throw exception; - } - } - - public int getPersistenceThreadPriority() { - return persistenceThreadPriority; - } - - public void setPersistenceThreadPriority(int persistenceThreadPriority) { - this.persistenceThreadPriority = persistenceThreadPriority; - } - - /** - * @return the useLocalHostBrokerName - */ - public boolean isUseLocalHostBrokerName() { - return this.useLocalHostBrokerName; - } - - /** - * @param useLocalHostBrokerName - * the useLocalHostBrokerName to set - */ - public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { - this.useLocalHostBrokerName = useLocalHostBrokerName; - if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { - brokerName = LOCAL_HOST_NAME; - } - } - - /** - * Looks up and lazily creates if necessary the destination for the given - * JMS name - */ - public Destination getDestination(ActiveMQDestination destination) throws Exception { - return getBroker().addDestination(getAdminConnectionContext(), destination,false); - } - - public void removeDestination(ActiveMQDestination destination) throws Exception { - getBroker().removeDestination(getAdminConnectionContext(), destination, 0); - } - - public int getProducerSystemUsagePortion() { - return producerSystemUsagePortion; - } - - public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { - this.producerSystemUsagePortion = producerSystemUsagePortion; - } - - public int getConsumerSystemUsagePortion() { - return consumerSystemUsagePortion; - } - - public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { - this.consumerSystemUsagePortion = consumerSystemUsagePortion; - } - - public boolean isSplitSystemUsageForProducersConsumers() { - return splitSystemUsageForProducersConsumers; - } - - public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { - this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; - } - - public boolean isMonitorConnectionSplits() { - return monitorConnectionSplits; - } - - public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { - this.monitorConnectionSplits = monitorConnectionSplits; - } - - public int getTaskRunnerPriority() { - return taskRunnerPriority; - } - - public void setTaskRunnerPriority(int taskRunnerPriority) { - this.taskRunnerPriority = taskRunnerPriority; - } - - public boolean isDedicatedTaskRunner() { - return dedicatedTaskRunner; - } - - public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { - this.dedicatedTaskRunner = dedicatedTaskRunner; - } - - public boolean isCacheTempDestinations() { - return cacheTempDestinations; - } - - public void setCacheTempDestinations(boolean cacheTempDestinations) { - this.cacheTempDestinations = cacheTempDestinations; - } - - public int getTimeBeforePurgeTempDestinations() { - return timeBeforePurgeTempDestinations; - } - - public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { - this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; - } - - public boolean isUseTempMirroredQueues() { - return useTempMirroredQueues; - } - - public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { - this.useTempMirroredQueues = useTempMirroredQueues; - } - - public synchronized JobSchedulerStore getJobSchedulerStore() { - - // If support is off don't allow any scheduler even is user configured their own. - if (!isSchedulerSupport()) { - return null; - } - - // If the user configured their own we use it even if persistence is disabled since - // we don't know anything about their implementation. - if (jobSchedulerStore == null) { - - if (!isPersistent()) { - this.jobSchedulerStore = new InMemoryJobSchedulerStore(); - configureService(jobSchedulerStore); - return this.jobSchedulerStore; - } - - try { - PersistenceAdapter pa = getPersistenceAdapter(); - if (pa != null) { - this.jobSchedulerStore = pa.createJobSchedulerStore(); - jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); - configureService(jobSchedulerStore); - return this.jobSchedulerStore; - } - } catch (IOException e) { - throw new RuntimeException(e); - } catch (UnsupportedOperationException ex) { - // It's ok if the store doesn't implement a scheduler. - } catch (Exception e) { - throw new RuntimeException(e); - } - - try { - PersistenceAdapter pa = getPersistenceAdapter(); - if (pa != null && pa instanceof JobSchedulerStore) { - this.jobSchedulerStore = (JobSchedulerStore) pa; - configureService(jobSchedulerStore); - return this.jobSchedulerStore; - } - } catch (IOException e) { - throw new RuntimeException(e); - } - - // Load the KahaDB store as a last resort, this only works if KahaDB is - // included at runtime, otherwise this will fail. User should disable - // scheduler support if this fails. - try { - String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; - PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); - jobSchedulerStore = adaptor.createJobSchedulerStore(); - jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); - configureService(jobSchedulerStore); - LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return jobSchedulerStore; - } - - public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) { - this.jobSchedulerStore = jobSchedulerStore; - configureService(jobSchedulerStore); - } - - // - // Implementation methods - // ------------------------------------------------------------------------- - /** - * Handles any lazy-creation helper properties which are added to make - * things easier to configure inside environments such as Spring - * - * @throws Exception - */ - protected void processHelperProperties() throws Exception { - if (transportConnectorURIs != null) { - for (int i = 0; i < transportConnectorURIs.length; i++) { - String uri = transportConnectorURIs[i]; - addConnector(uri); - } - } - if (networkConnectorURIs != null) { - for (int i = 0; i < networkConnectorURIs.length; i++) { - String uri = networkConnectorURIs[i]; - addNetworkConnector(uri); - } - } - if (jmsBridgeConnectors != null) { - for (int i = 0; i < jmsBridgeConnectors.length; i++) { - addJmsConnector(jmsBridgeConnectors[i]); - } - } - } - - public static int t = 0; - /** - * Check that the store usage limit is not greater than max usable - * space and adjust if it is - */ - protected void checkStoreUsageLimits() throws IOException { - final SystemUsage usage = getSystemUsage(); - - if (getPersistenceAdapter() != null) { - PersistenceAdapter adapter = getPersistenceAdapter(); - File dir = adapter.getDirectory(); - - if (dir != null) { - dir = StoreUtil.findParentDirectory(dir); - - int usagePercent = usage.getStoreUsage().getPercentLimit(); - long storeLimit = usage.getStoreUsage().getLimit(); - long storeCurrent = usage.getStoreUsage().getUsage(); - long dirFreeSpace = dir.getUsableSpace(); - - if (diskUsageCheckRegrowPercentChange && usagePercent > 0 && - storeLimit < (dirFreeSpace + storeCurrent)) { - LOG.info("Usable disk space has been increased, attempting to regrow store limit to " + - + usagePercent + "% of the partition size."); - - //trigger the recomputation of the absolute value by - //calling setPercentLimit again - usage.getStoreUsage().setPercentLimit(usagePercent); - storeLimit = usage.getStoreUsage().getLimit(); - storeCurrent = usage.getStoreUsage().getUsage(); - } - - if (storeLimit > (dirFreeSpace + storeCurrent)) { - if (usagePercent > 0) { - LOG.warn("Store limit has been set to " - + usagePercent + "% of the partition size but " - + "there is not enough usable space."); - } - - LOG.warn("Store limit is " + storeLimit / (1024 * 1024) + - " mb (current store usage is " + storeCurrent / (1024 * 1024) + - " mb). The data directory: " + dir.getAbsolutePath() + - " only has " + dirFreeSpace / (1024 * 1024) + - " mb of usable space - resetting to maximum available disk space: " + - (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb"); - usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent); - } - } - - long maxJournalFileSize = 0; - long storeLimit = usage.getStoreUsage().getLimit(); - - if (adapter instanceof JournaledStore) { - maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength(); - } - - if (storeLimit < maxJournalFileSize) { - LOG.error("Store limit is " + storeLimit / (1024 * 1024) + - " mb, whilst the max journal file size for the store is: " + - maxJournalFileSize / (1024 * 1024) + " mb, " + - "the store will not accept any data when used."); - - } - } - } - - /** - * Check that temporary usage limit is not greater than max usable - * space and adjust if it is - */ - protected void checkTmpStoreUsageLimits() throws IOException { - final SystemUsage usage = getSystemUsage(); - - File tmpDir = getTmpDataDirectory(); - - if (tmpDir != null) { - tmpDir = StoreUtil.findParentDirectory(tmpDir); - - long storeLimit = usage.getTempUsage().getLimit(); -<<<<<<< 4f8d56aaf60f99abe643e79c6c4940a571289a86 - long storeCurrent = usage.getTempUsage().getUsage(); - while (tmpDir != null && !tmpDir.isDirectory()) { - tmpDir = tmpDir.getParentFile(); - } - long dirFreeSpace = tmpDir.getUsableSpace(); - if (storeLimit > (dirFreeSpace + storeCurrent)) { - LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) + - " mb (current temporary store usage is " + storeCurrent / (1024 * 1024) + - " mb). The temporary data directory: " + tmpDir.getAbsolutePath() + - " only has " + dirFreeSpace / (1024 * 1024) + - " mb of usable space - resetting to maximum available disk space: " + - (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb"); - usage.getTempUsage().setLimit(dirFreeSpace + storeCurrent); -======= - long dirFreeSpace = tmpDir.getUsableSpace(); - int usagePercent = usage.getTempUsage().getPercentLimit(); - -// if (diskUsageCheckRegrowPercentChange && usagePercent > 0 && -// storeLimit < (dirFreeSpace + storeCurrent)) { -// LOG.info("Usable disk space has been increased, attempting to regrow store limit to " + -// + usagePercent + "% of the partition size."); -// -// //trigger the recomputation of the absolute value by -// //calling setPercentLimit again -// usage.getStoreUsage().setPercentLimit(usagePercent); -// storeLimit = usage.getStoreUsage().getLimit(); -// storeCurrent = usage.getStoreUsage().getUsage(); -// } - - if (storeLimit > dirFreeSpace) { - if (usagePercent > 0) { - LOG.warn("Temporary Store limit has been set to " - + usagePercent + "% of the partition size but " - + "there is not enough usable space."); - } - - LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) + - " mb, whilst the temporary data directory: " + tmpDir.getAbsolutePath() + - " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to maximum available " + - dirFreeSpace / (1024 * 1024) + " mb."); - usage.getTempUsage().setLimit(dirFreeSpace); ->>>>>>> https://issues.apache.org/jira/browse/AMQ-5965 - } - - if (isPersistent()) { - long maxJournalFileSize; - - PListStore store = usage.getTempUsage().getStore(); - if (store != null && store instanceof JournaledStore) { - maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength(); - } else { - maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; - } - - if (storeLimit < maxJournalFileSize) { - LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + - " mb, whilst the max journal file size for the temporary store is: " + - maxJournalFileSize / (1024 * 1024) + " mb, " + - "the temp store will not accept any data when used."); - } - } - } - } - - /** - * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to - * update store and temporary store limits if the amount of available space - * plus current store size is less than the existin configured limit - */ - protected void scheduleDiskUsageLimitsCheck() throws IOException { - if (schedulePeriodForDiskUsageCheck > 0 && - (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) { - Runnable diskLimitCheckTask = new Runnable() { - @Override - public void run() { - try { - checkStoreUsageLimits(); - } catch (IOException e) { - LOG.error("Failed to check persistent disk usage limits", e); - } - - try { - checkTmpStoreUsageLimits(); - } catch (IOException e) { - LOG.error("Failed to check temporary store usage limits", e); - } - } - }; - scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck); - } - } - - protected void checkSystemUsageLimits() throws IOException { - final SystemUsage usage = getSystemUsage(); - long memLimit = usage.getMemoryUsage().getLimit(); - long jvmLimit = Runtime.getRuntime().maxMemory(); - - if (memLimit > jvmLimit) { - usage.getMemoryUsage().setPercentOfJvmHeap(70); - LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) + - " mb) is more than the maximum available for the JVM: " + - jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); - } - - //Check the persistent store and temp store limits if they exist - //and schedule a periodic check to update disk limits if - //schedulePeriodForDiskLimitCheck is set - checkStoreUsageLimits(); - checkTmpStoreUsageLimits(); - scheduleDiskUsageLimitsCheck(); - - if (getJobSchedulerStore() != null) { - JobSchedulerStore scheduler = getJobSchedulerStore(); - File schedulerDir = scheduler.getDirectory(); - if (schedulerDir != null) { - - String schedulerDirPath = schedulerDir.getAbsolutePath(); - if (!schedulerDir.isAbsolute()) { - schedulerDir = new File(schedulerDirPath); - } - - while (schedulerDir != null && !schedulerDir.isDirectory()) { - schedulerDir = schedulerDir.getParentFile(); - } - long schedulerLimit = usage.getJobSchedulerUsage().getLimit(); - long dirFreeSpace = schedulerDir.getUsableSpace(); - if (schedulerLimit > dirFreeSpace) { - LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) + - " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() + - " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " + - dirFreeSpace / (1024 * 1024) + " mb."); - usage.getJobSchedulerUsage().setLimit(dirFreeSpace); - } - } - } - } - - public void stopAllConnectors(ServiceStopper stopper) { - for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { - NetworkConnector connector = iter.next(); - unregisterNetworkConnectorMBean(connector); - stopper.stop(connector); - } - for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { - ProxyConnector connector = iter.next(); - stopper.stop(connector); - } - for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { - JmsConnector connector = iter.next(); - stopper.stop(connector); - } - for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { - TransportConnector connector = iter.next(); - try { - unregisterConnectorMBean(connector); - } catch (IOException e) { - } - stopper.stop(connector); - } - } - - protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { - try { - ObjectName objectName = createConnectorObjectName(connector); - connector = connector.asManagedConnector(getManagementContext(), objectName); - ConnectorViewMBean view = new ConnectorView(connector); - AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); - return connector; - } catch (Throwable e) { - throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e); - } - } - - protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { - if (isUseJmx()) { - try { - ObjectName objectName = createConnectorObjectName(connector); - getManagementContext().unregisterMBean(objectName); - } catch (Throwable e) { - throw IOExceptionSupport.create( - "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); - } - } - } - - protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { - return adaptor; - } - - protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { - if (isUseJmx()) {} - } - - private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { - return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName()); - } - - public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { - NetworkConnectorViewMBean view = new NetworkConnectorView(connector); - try { - ObjectName objectName = createNetworkConnectorObjectName(connector); - connector.setObjectName(objectName); - AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); - } catch (Throwable e) { - throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); - } - } - - protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { - return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); - } - - public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException { - return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport); - } - - protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { - if (isUseJmx()) { - try { - ObjectName objectName = createNetworkConnectorObjectName(connector); - getManagementContext().unregisterMBean(objectName); - } catch (Exception e) { - LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e); - } - } - } - - protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { - ProxyConnectorView view = new ProxyConnectorView(connector); - try { - ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName()); - AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); - } catch (Throwable e) { - throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); - } - } - - protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { - JmsConnectorView view = new JmsConnectorView(connector); - try { - ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName()); - AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); - } catch (Throwable e) { - throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); - } - } - - /** - * Factory method to create a new broker - * - * @ <TRUNCATED>