Repository: activemq Updated Branches: refs/heads/master 66cfc7bab -> db1506a59
https://issues.apache.org/jira/browse/AMQ-6124 - fix and test - propagate broker info from prestarted backup transport Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/db1506a5 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/db1506a5 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/db1506a5 Branch: refs/heads/master Commit: db1506a5921f70134c3b647cec51204f0e1c1416 Parents: 66cfc7b Author: gtully <gary.tu...@gmail.com> Authored: Tue Jan 12 14:00:13 2016 +0000 Committer: gtully <gary.tu...@gmail.com> Committed: Tue Jan 12 14:00:13 2016 +0000 ---------------------------------------------------------------------- .../transport/failover/BackupTransport.java | 16 +++++++++++++++- .../transport/failover/FailoverTransport.java | 1 + .../failover/FailoverClusterTestSupport.java | 6 ++++++ .../transport/failover/FailoverPriorityTest.java | 5 ++++- .../failover/FailoverTransportBackupsTest.java | 15 +++++++++++++-- 5 files changed, 39 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java index f6df0a4..9c591e2 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.failover; +import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; @@ -29,10 +30,12 @@ class BackupTransport extends DefaultTransportListener{ private Transport transport; private URI uri; private boolean disposed; - + private BrokerInfo brokerInfo; + BackupTransport(FailoverTransport ft){ this.failoverTransport=ft; } + @Override public void onException(IOException error) { this.disposed=true; @@ -41,6 +44,17 @@ class BackupTransport extends DefaultTransportListener{ } } + @Override + public void onCommand(Object command) { + if (command instanceof BrokerInfo) { + brokerInfo = (BrokerInfo) command; + } + } + + public BrokerInfo getBrokerInfo() { + return brokerInfo; + } + public Transport getTransport() { return transport; } http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 7f7d7c6..dcb0867 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -991,6 +991,7 @@ public class FailoverTransport implements CompositeTransport { backups.remove(bt); transport = bt.getTransport(); uri = bt.getUri(); + myTransportListener.onCommand(bt.getBrokerInfo()); if (priorityBackup && priorityBackupAvailable) { Transport old = this.connectedTransport.getAndSet(null); if (old != null) { http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java index c5e9665..01dcce4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java @@ -110,6 +110,12 @@ public class FailoverClusterTestSupport extends TestCase { } } + protected void assertBrokerInfo(String brokerName) throws Exception { + for (ActiveMQConnection c : connections) { + assertEquals(brokerName, c.getBrokerInfo().getBrokerName()); + } + } + protected void addBroker(String name, BrokerService brokerService) { brokers.put(name, brokerService); } http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java index bed5183..72137dd 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java @@ -52,7 +52,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { createClients(5); assertAllConnectedTo(urls.get(BROKER_A_NAME)); - + assertBrokerInfo(BROKER_A_NAME); restart(false, BROKER_A_NAME, BROKER_B_NAME); @@ -169,8 +169,10 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { if (primary) { assertAllConnectedTo(urls.get(secondaryName)); + assertBrokerInfo(secondaryName); } else { assertAllConnectedTo(urls.get(primaryName)); + assertBrokerInfo(primaryName); } if (primary) { @@ -186,6 +188,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { Thread.sleep(5000); assertAllConnectedTo(urls.get(primaryName)); + assertBrokerInfo(primaryName); } http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java index b1c8a1b..ed39268 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.net.URI; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; @@ -111,9 +113,10 @@ public class FailoverTransportBackupsTest { } })); + assertEquals("conected to..", "1", currentBrokerInfo.getBrokerName()); broker1.stop(); - assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){ + assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups()); @@ -124,9 +127,10 @@ public class FailoverTransportBackupsTest { assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1); assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1); + assertEquals("conected to..", "2", currentBrokerInfo.getBrokerName()); broker2.stop(); - assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){ + assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups()); @@ -136,6 +140,8 @@ public class FailoverTransportBackupsTest { assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 2); assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 2); + + assertEquals("conected to..", "3", currentBrokerInfo.getBrokerName()); } @Test @@ -183,6 +189,7 @@ public class FailoverTransportBackupsTest { return bs; } + BrokerInfo currentBrokerInfo; protected Transport createTransport(int backups) throws Exception { String connectionUri = "failover://("+ broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," + @@ -199,6 +206,10 @@ public class FailoverTransportBackupsTest { @Override public void onCommand(Object command) { LOG.debug("Test Transport Listener received Command: " + command); + if (command instanceof BrokerInfo) { + currentBrokerInfo = (BrokerInfo) command; + LOG.info("BrokerInfo: " + currentBrokerInfo); + } } @Override