Author: rajdavies
Date: Wed May 28 08:19:30 2008
New Revision: 660977
URL: http://svn.apache.org/viewvc?rev=660977&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1541
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Wed May 28 08:19:30 2008
@@ -333,6 +333,27 @@
LOG.warn("Failed to fire message is full advisory");
}
}
+
+ public void nowMasterBroker() {
+ super.nowMasterBroker();
+ try {
+ ActiveMQTopic topic =
AdvisorySupport.getMasterBrokerAdvisoryTopic();
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setStringProperty("brokerName", getBrokerName());
+ String[] uris = getBrokerService().getTransportConnectorURIs();
+ String uri = getBrokerService().getVmConnectorURI().toString();
+ if (uris != null && uris.length > 0) {
+ uri = uris[0];
+ }
+ advisoryMessage.setStringProperty("brokerURL", getBrokerName());
+ advisoryMessage.setStringProperty("brokerURI", uri);
+ ConnectionContext context = new ConnectionContext();
+ context.setBroker(getBrokerService().getBroker());
+ fireAdvisory(context, topic,advisoryMessage);
+ } catch (Exception e) {
+ LOG.warn("Failed to fire message master broker advisory");
+ }
+ }
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic
topic, Command command) throws Exception {
fireAdvisory(context, topic, command, null);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Wed May 28 08:19:30 2008
@@ -45,6 +45,7 @@
public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX +
"FULL.";
public static final String MESSAGE_DELIVERED_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
public static final String MESSAGE_CONSUMED_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
+ public static final String MASTER_BROKER_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX + "MasterBroker";
public static final String AGENT_TOPIC = "ActiveMQ.Agent";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
@@ -137,6 +138,10 @@
return new ActiveMQTopic(name);
}
+ public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
+ return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
+ }
+
public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination
destination) {
String name = FULL_TOPIC_PREFIX
+ destination.getDestinationTypeAsString() + "."
@@ -272,6 +277,20 @@
}
}
+ public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination
destination) {
+ if (destination.isComposite()) {
+ ActiveMQDestination[] compositeDestinations =
destination.getCompositeDestinations();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ return destination.isTopic() &&
destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
+ }
+ }
+
public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination
destination) {
if (destination.isComposite()) {
ActiveMQDestination[] compositeDestinations =
destination.getCompositeDestinations();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
Wed May 28 08:19:30 2008
@@ -365,6 +365,12 @@
* @param usage
*/
void isFull(ConnectionContext context,Destination destination,Usage usage);
+
+ /**
+ * called when the broker becomes the master in a master/slave
+ * configuration
+ */
+ void nowMasterBroker();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Wed May 28 08:19:30 2008
@@ -290,4 +290,8 @@
public void slowConsumer(ConnectionContext context, Destination
destination,Subscription subs) {
next.slowConsumer(context, destination,subs);
}
+
+ public void nowMasterBroker() {
+ next.nowMasterBroker();
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Wed May 28 08:19:30 2008
@@ -409,6 +409,7 @@
LOG.warn("Master Failed - starting all connectors");
try {
startAllConnectors();
+ broker.nowMasterBroker();
} catch (Exception e) {
LOG.error("Failed to startAllConnectors");
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Wed May 28 08:19:30 2008
@@ -221,7 +221,7 @@
public void setAdminConnectionContext(ConnectionContext
adminConnectionContext) {
}
- public Response messagePull(ConnectionContext context, MessagePull pull) {
+ public Response messagePull(ConnectionContext context, MessagePull pull)
throws Exception {
return null;
}
@@ -275,4 +275,7 @@
public void slowConsumer(ConnectionContext context,Destination
destination, Subscription subs) {
}
+
+ public void nowMasterBroker() {
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Wed May 28 08:19:30 2008
@@ -292,4 +292,8 @@
public void slowConsumer(ConnectionContext context, Destination
destination,Subscription subs) {
throw new BrokerStoppedException(this.message);
}
+
+ public void nowMasterBroker() {
+ throw new BrokerStoppedException(this.message);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Wed May 28 08:19:30 2008
@@ -302,5 +302,9 @@
public void slowConsumer(ConnectionContext context, Destination dest,
Subscription subs) {
getNext().slowConsumer(context, dest,subs);
}
+
+ public void nowMasterBroker() {
+ getNext().nowMasterBroker();
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Wed May 28 08:19:30 2008
@@ -25,15 +25,14 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.EmptyBroker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -52,13 +51,11 @@
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
@@ -71,7 +68,7 @@
*
* @version $Revision$
*/
-public class RegionBroker implements Broker {
+public class RegionBroker extends EmptyBroker {
private static final Log LOG = LogFactory.getLog(RegionBroker.class);
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
@@ -324,12 +321,6 @@
return rc;
}
- public void addSession(ConnectionContext context, SessionInfo info) throws
Exception {
- }
-
- public void removeSession(ConnectionContext context, SessionInfo info)
throws Exception {
- }
-
public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
ActiveMQDestination destination = info.getDestination();
@@ -619,10 +610,6 @@
return destinationFactory.getDestinations();
}
- public boolean isFaultTolerantConfiguration() {
- return false;
- }
-
protected void doStop(ServiceStopper ss) {
ss.stop(queueRegion);
ss.stop(topicRegion);
@@ -680,24 +667,6 @@
getRoot().sendToDeadLetterQueue(context, node);
}
- public void fastProducer(ConnectionContext context,ProducerInfo
producerInfo) {
- }
-
- public void isFull(ConnectionContext context,Destination destination,
Usage usage) {
- }
-
- public void messageConsumed(ConnectionContext context,MessageReference
messageReference) {
- }
-
- public void messageDelivered(ConnectionContext context,MessageReference
messageReference) {
- }
-
- public void messageDiscarded(ConnectionContext context,MessageReference
messageReference) {
- }
-
- public void slowConsumer(ConnectionContext context, Destination dest,
Subscription subs) {
- }
-
public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference node){
try{
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Wed May 28 08:19:30 2008
@@ -921,7 +921,7 @@
- protected void lock() throws IOException, InterruptedException {
+ protected void lock() throws Exception {
boolean logged = false;
boolean aquiredLock = false;
do {
@@ -937,6 +937,9 @@
if (aquiredLock && logged) {
LOG.info("Aquired lock for AMQ Store" + getDirectory());
+ if (brokerService != null) {
+ brokerService.getBroker().nowMasterBroker();
+ }
}
} while (!aquiredLock && !disableLocking);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Wed May 28 08:19:30 2008
@@ -175,6 +175,9 @@
LOG.warn("No databaseLocker configured for the JDBC
Persistence Adapter");
} else {
service.start();
+ if (brokerService != null) {
+ brokerService.getBroker().nowMasterBroker();
+ }
}
}