http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java index ffdfc6e..d34f943 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -16,22 +16,25 @@ */ package org.apache.activemq.broker; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.net.ServerSocket; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.WeakHashMap; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.ActiveMQConnectionMetaData; import org.apache.activemq.Service; @@ -44,6 +47,8 @@ import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; +import org.apache.activemq.network.ConnectionFilter; +import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.jms.JmsConnector; import org.apache.activemq.proxy.ProxyConnector; @@ -57,6 +62,7 @@ import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.IOExceptionHandler; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.ServiceStopper; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,10 +74,12 @@ import org.slf4j.LoggerFactory; public class BrokerService implements Service { public static final String DEFAULT_PORT = "61616"; + public static final AtomicInteger RANDOM_PORT_BASE = new AtomicInteger(51616); public static final String DEFAULT_BROKER_NAME = "localhost"; public static final String BROKER_VERSION; public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; public static final long DEFAULT_START_TIMEOUT = 600000L; + public static boolean disableWrapper = false; public String SERVER_SIDE_KEYSTORE; public String KEYSTORE_PASSWORD; @@ -99,6 +107,11 @@ public class BrokerService implements Service { private PolicyMap destinationPolicy; private SystemUsage systemUsage; + private boolean isClustered = true; + private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); + + private TemporaryFolder tmpfolder; + public static WeakHashMap<Broker, Exception> map = new WeakHashMap<>(); static { @@ -131,6 +144,10 @@ public class BrokerService implements Service { @Override public void start() throws Exception { + File targetTmp = new File("./target/tmp"); + targetTmp.mkdirs(); + tmpfolder = new TemporaryFolder(targetTmp); + tmpfolder.create(); Exception e = new Exception(); e.fillInStackTrace(); startBroker(startAsync); @@ -188,10 +205,10 @@ public class BrokerService implements Service { LOG.info("Apache ActiveMQ Artemis{} ({}, {}) is shutting down", new Object[]{getBrokerVersion(), getBrokerName(), brokerId}); if (broker != null) { - System.out.println("______________________stopping broker: " + broker.getClass().getName()); broker.stop(); broker = null; } + tmpfolder.delete(); LOG.info("Apache ActiveMQ Artemis {} ({}, {}) is shutdown", new Object[]{getBrokerVersion(), getBrokerName(), brokerId}); } @@ -200,7 +217,7 @@ public class BrokerService implements Service { public Broker getBroker() throws Exception { if (broker == null) { - broker = createBroker(); + broker = createBroker(tmpfolder.getRoot()); } return broker; } @@ -220,13 +237,14 @@ public class BrokerService implements Service { this.brokerName = str.trim(); } - protected Broker createBroker() throws Exception { - broker = createBrokerWrapper(); + protected Broker createBroker(File temporaryFile) throws Exception { + new Exception("file=" + temporaryFile.getAbsolutePath()).printStackTrace(); + broker = createBrokerWrapper(temporaryFile); return broker; } - private Broker createBrokerWrapper() { - return new ArtemisBrokerWrapper(this); + private Broker createBrokerWrapper(File temporaryFile) { + return new ArtemisBrokerWrapper(this, temporaryFile); } public void makeSureDestinationExists(ActiveMQDestination activemqDestination) throws Exception { @@ -382,10 +400,6 @@ public class BrokerService implements Service { public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { } - public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { - return null; - } - public TransportConnector getConnectorByName(String connectorName) { return null; } @@ -407,8 +421,17 @@ public class BrokerService implements Service { public void setSchedulerDirectoryFile(File schedulerDirectory) { } + public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { + return addNetworkConnector(new URI(discoveryAddress)); + } + + public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { + NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); + return addNetworkConnector(connector); + } + public List<NetworkConnector> getNetworkConnectors() { - return new ArrayList<>(); + return this.networkConnectors; } public void setSchedulerSupport(boolean schedulerSupport) { @@ -471,6 +494,30 @@ public class BrokerService implements Service { } public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { + connector.setBrokerService(this); + + System.out.println("------------------------ this broker uri: " + this.getConnectURI()); + connector.setLocalUri(this.getConnectURI()); + // Set a connection filter so that the connector does not establish loop + // back connections. + connector.setConnectionFilter(new ConnectionFilter() { + @Override + public boolean connectTo(URI location) { + List<TransportConnector> transportConnectors = getTransportConnectors(); + for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { + try { + TransportConnector tc = iter.next(); + if (location.equals(tc.getConnectUri())) { + return false; + } + } catch (Throwable e) { + } + } + return true; + } + }); + + networkConnectors.add(connector); return connector; } @@ -486,19 +533,63 @@ public class BrokerService implements Service { public TransportConnector addConnector(URI bindAddress) throws Exception { Integer port = bindAddress.getPort(); + String host = bindAddress.getHost(); FakeTransportConnector connector = null; - if (port != 0) { - connector = new FakeTransportConnector(bindAddress); - this.transportConnectors.add(connector); - this.extraConnectors.add(port); + + host = (host == null || host.length() == 0) ? "localhost" : host; + if ("0.0.0.0".equals(host)) { + host = "localhost"; } - else { - connector = new FakeTransportConnector(new URI(this.getDefaultUri())); - this.transportConnectors.add(connector); + + if (port == 0) { + //In actual impl in amq5, after connector has been added the socket + //is bound already. This means in case of 0 port uri, the random + //port is available after this call. With artemis wrapper however + //the real binding happens during broker start. To work around this + //we use manually calculated port for that. + port = getPseudoRandomPort(); + } + + System.out.println("Now host is: " + host); + bindAddress = new URI(bindAddress.getScheme(), bindAddress.getUserInfo(), + host, port, bindAddress.getPath(), bindAddress.getQuery(), bindAddress.getFragment()); + + connector = new FakeTransportConnector(bindAddress); + this.transportConnectors.add(connector); + this.extraConnectors.add(port); + return connector; } + private int getPseudoRandomPort() { + int port = RANDOM_PORT_BASE.getAndIncrement(); + while (!checkPort(port)) { + port = RANDOM_PORT_BASE.getAndIncrement(); + } + return port; + } + + private static boolean checkPort(final int port) { + ServerSocket ssocket = null; + try { + ssocket = new ServerSocket(port); + } + catch (Exception e) { + return false; + } + finally { + if (ssocket != null) { + try { + ssocket.close(); + } + catch (IOException e) { + } + } + } + return true; + } + public void setCacheTempDestinations(boolean cacheTempDestinations) { }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java index 5c052a6..fb3c242 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java @@ -17,7 +17,6 @@ package org.apache.activemq.broker.artemiswrapper; import java.io.File; -import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -65,7 +64,6 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.PListStore; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.usage.Usage; -import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,20 +81,19 @@ public abstract class ArtemisBrokerBase implements Broker { protected volatile boolean stopped; protected BrokerId brokerId = new BrokerId("Artemis Broker"); protected BrokerService bservice; - protected TemporaryFolder temporaryFolder = new TemporaryFolder(); - protected String testDir; + + protected final File temporaryFolder; + protected final String testDir; protected boolean realStore = false; protected ActiveMQServer server; protected boolean enableSecurity = false; - public ArtemisBrokerBase() { - try { - this.temporaryFolder.create(); - } - catch (IOException e) { - } + public ArtemisBrokerBase(File temporaryFolder) { + this.temporaryFolder = temporaryFolder; + this.testDir = temporaryFolder.getAbsolutePath(); + } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 61d6250..3ad6072 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.artemiswrapper; +import java.io.File; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -46,20 +47,16 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { protected final Map<String, SimpleString> testQueues = new HashMap<>(); protected JMSServerManagerImpl jmsServer; - public ArtemisBrokerWrapper(BrokerService brokerService) { + public ArtemisBrokerWrapper(BrokerService brokerService, File temporaryFolder) { + super(temporaryFolder); this.bservice = brokerService; } @Override public void start() throws Exception { - testDir = temporaryFolder.getRoot().getAbsolutePath(); clearDataRecreateServerDirs(); server = createServer(realStore, true); server.getConfiguration().getAcceptorConfigurations().clear(); - HashMap<String, Object> params = new HashMap<>(); - params.put(TransportConstants.PORT_PROP_NAME, "61616"); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE,CORE"); - TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); Configuration serverConfig = server.getConfiguration(); @@ -82,9 +79,11 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { commonSettings.setDeadLetterAddress(dla); commonSettings.setAutoCreateJmsQueues(true); - serverConfig.getAcceptorConfigurations().add(transportConfiguration); + HashMap<String, Object> params = new HashMap<String, Object>(); + if (bservice.extraConnectors.size() == 0) { + serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616?protocols=OPENWIRE,CORE"); + } if (this.bservice.enableSsl()) { - params = new HashMap<>(); params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true); params.put(TransportConstants.PORT_PROP_NAME, 61611); params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE"); @@ -102,14 +101,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { } for (Integer port : bservice.extraConnectors) { - if (port.intValue() != 61616) { - //extra port - params = new HashMap<>(); - params.put(TransportConstants.PORT_PROP_NAME, port.intValue()); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE"); - TransportConfiguration extraTransportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); - serverConfig.getAcceptorConfigurations().add(extraTransportConfiguration); - } + serverConfig.addAcceptorConfiguration("homePort" + port, "tcp://localhost:" + port + "?protocols=OPENWIRE,CORE"); } serverConfig.setSecurityEnabled(enableSecurity); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java new file mode 100644 index 0000000..be9cf06 --- /dev/null +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.artemiswrapper; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.api.jms.management.JMSQueueControl; +import org.apache.activemq.artemis.api.jms.management.JMSServerControl; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.BrokerService; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; + +public class OpenwireArtemisBaseTest { + + @Rule + public TemporaryFolder temporaryFolder; + @Rule + public TestName name = new TestName(); + + public OpenwireArtemisBaseTest() { + File tmpRoot = new File("./target/tmp"); + tmpRoot.mkdirs(); + temporaryFolder = new TemporaryFolder(tmpRoot); + //The wrapper stuff will automatically create a default + //server on a normal connection factory, which will + //cause problems with clustering tests, which starts + //all servers explicitly. Setting this to true + //can prevent the auto-creation from happening. + BrokerService.disableWrapper = true; + } + + + public String getTmp() { + return getTmpFile().getAbsolutePath(); + } + + public File getTmpFile() { + return temporaryFolder.getRoot(); + } + + protected String getJournalDir(int serverID, boolean backup) { + return getTmp() + "/journal_" + serverID + "_" + backup; + } + + protected String getBindingsDir(int serverID, boolean backup) { + return getTmp() + "/binding_" + serverID + "_" + backup; + } + + protected String getPageDir(int serverID, boolean backup) { + return getTmp() + "/paging_" + serverID + "_" + backup; + } + + protected String getLargeMessagesDir(int serverID, boolean backup) { + return getTmp() + "/paging_" + serverID + "_" + backup; + } + + public String CLUSTER_PASSWORD = "OPENWIRECLUSTER"; + + protected Configuration createConfig(final int serverID) throws Exception { + return createConfig("localhost", serverID); + } + + protected Configuration createConfig(final String hostAddress, final int serverID, final int port) throws Exception { + ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). + setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO). + setJournalDirectory(getJournalDir(serverID, false)). + setBindingsDirectory(getBindingsDir(serverID, false)). + setPagingDirectory(getPageDir(serverID, false)). + setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)). + setJournalCompactMinFiles(0). + setJournalCompactPercentage(0). + setClusterPassword(CLUSTER_PASSWORD); + + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); + + configuration.addAcceptorConfiguration("netty", newURIwithPort(hostAddress, port)); + configuration.addConnectorConfiguration("netty-connector", newURIwithPort(hostAddress, port)); + + return configuration; + } + + protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception { + ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). + setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO). + setJournalDirectory(getJournalDir(serverID, false)). + setBindingsDirectory(getBindingsDir(serverID, false)). + setPagingDirectory(getPageDir(serverID, false)). + setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)). + setJournalCompactMinFiles(0). + setJournalCompactPercentage(0). + setClusterPassword(CLUSTER_PASSWORD); + + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); + + configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID)); + configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID)); + + return configuration; + } + + //extraAcceptor takes form: "?name=value&name1=value ..." + protected Configuration createConfig(final int serverID, String extraAcceptorParams) throws Exception { + ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). + setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(JournalType.NIO). + setJournalDirectory(getJournalDir(serverID, false)). + setBindingsDirectory(getBindingsDir(serverID, false)). + setPagingDirectory(getPageDir(serverID, false)). + setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)). + setJournalCompactMinFiles(0). + setJournalCompactPercentage(0). + setClusterPassword(CLUSTER_PASSWORD); + + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); + + String fullAcceptorUri = newURI(serverID) + extraAcceptorParams; + configuration.addAcceptorConfiguration("netty", fullAcceptorUri); + + configuration.addConnectorConfiguration("netty-connector", newURI(serverID)); + return configuration; + } + + public void deployClusterConfiguration(Configuration config, Integer ... targetIDs) throws Exception { + StringBuffer stringBuffer = new StringBuffer(); + String separator = ""; + for (int x : targetIDs) { + stringBuffer.append(separator + newURI(x)); + separator = ","; + } + + String ccURI = "static://(" + stringBuffer.toString() + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=STRICT;maxHops=1"; + + config.addClusterConfiguration("clusterCC", ccURI); + } + + protected static String newURI(int serverID) { + return newURI("localhost", serverID); + } + + protected static String newURI(String localhostAddress, int serverID) { + return "tcp://" + localhostAddress + ":" + (61616 + serverID); + } + + protected static String newURIwithPort(String localhostAddress, int port) { + return "tcp://" + localhostAddress + ":" + port; + } + + public static JMSServerControl createJMSServerControl(final MBeanServer mbeanServer) throws Exception { + return (JMSServerControl) createProxy(ObjectNameBuilder.DEFAULT.getJMSServerObjectName(), JMSServerControl.class, mbeanServer); + } + + public static JMSQueueControl createJMSQueueControl(final String name, + final MBeanServer mbeanServer) throws Exception { + return (JMSQueueControl) createProxy(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(name), JMSQueueControl.class, mbeanServer); + } + + private static Object createProxy(final ObjectName objectName, + final Class mbeanInterface, + final MBeanServer mbeanServer) { + return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, objectName, mbeanInterface, false); + } + + protected void shutDownClusterServers(EmbeddedJMS[] servers) throws Exception { + for (int i = 0; i < servers.length; i++) { + try { + servers[i].stop(); + } + catch (Throwable t) { + t.printStackTrace(); + } + } + } + + protected void shutDownNonClusterServers(EmbeddedJMS[] servers) throws Exception { + shutDownClusterServers(servers); + } + + protected void setUpNonClusterServers(EmbeddedJMS[] servers) throws Exception { + + Configuration[] serverCfgs = new Configuration[servers.length]; + for (int i = 0; i < servers.length; i++) { + serverCfgs[i] = createConfig(i); + } + + for (int i = 0; i < servers.length; i++) { + servers[i] = new EmbeddedJMS().setConfiguration(serverCfgs[i]).setJmsConfiguration(new JMSConfigurationImpl()); + } + + for (int i = 0; i < servers.length; i++) { + servers[i].start(); + } + } + + protected void setUpClusterServers(EmbeddedJMS[] servers) throws Exception { + + Configuration[] serverCfgs = new Configuration[servers.length]; + for (int i = 0; i < servers.length; i++) { + serverCfgs[i] = createConfig(i); + } + + for (int i = 0; i < servers.length; i++) { + deployClusterConfiguration(serverCfgs[i], getTargets(servers.length, i)); + } + + for (int i = 0; i < servers.length; i++) { + servers[i] = new EmbeddedJMS().setConfiguration(serverCfgs[i]).setJmsConfiguration(new JMSConfigurationImpl()); + } + + for (int i = 0; i < servers.length; i++) { + servers[i].start(); + } + + for (int i = 0; i < servers.length; i++) { + Assert.assertTrue(servers[i].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, servers.length)); + } + } + + private Integer[] getTargets(int total, int self) + { + int lenTargets = total - self; + List<Integer> targets = new ArrayList<>(); + for (int i = 0; i < lenTargets; i++) { + if (i != self) { + targets.add(i); + } + } + return targets.toArray(new Integer[0]); + } + + public EmbeddedJMS createBroker() throws Exception { + Configuration config0 = createConfig(0); + EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + return newbroker; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index 34babf8..0843d3a 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -29,6 +30,7 @@ import javax.net.SocketFactory; import org.apache.activemq.TransportLoggerSupport; import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.*; import org.apache.activemq.util.IOExceptionSupport; @@ -54,11 +56,10 @@ public class TcpTransportFactory extends TransportFactory { //here check broker, if no broker, we start one Map<String, String> params = URISupport.parseParameters(location); String brokerId = params.remove("invmBrokerId"); - params.clear(); - location = URISupport.createRemainingURI(location, params); - if (brokerService == null) { + URI location1 = URISupport.createRemainingURI(location, Collections.EMPTY_MAP); + if (brokerService == null && !BrokerService.disableWrapper) { - ArtemisBrokerHelper.startArtemisBroker(location); + ArtemisBrokerHelper.startArtemisBroker(location1); brokerService = location.toString(); if (brokerId != null) { @@ -66,7 +67,8 @@ public class TcpTransportFactory extends TransportFactory { System.out.println("bound: " + brokerId); } } - return super.doConnect(location); + URI location2 = URISupport.createRemainingURI(location, params); + return super.doConnect(location2); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java deleted file mode 100644 index fd06de9..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.io.InputStream; -import java.io.OutputStream; - -import javax.jms.Queue; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQDestination; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated -public class ActiveMQInputStreamTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class); - - private static final String BROKER_URL = "tcp://localhost:0"; - private static final String DESTINATION = "destination"; - private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash - - private BrokerService broker; - private String connectionUri; - - @Override - public void setUp() throws Exception { - broker = new BrokerService(); - broker.setUseJmx(false); - broker.setPersistent(false); - broker.setDestinations(new ActiveMQDestination[]{ActiveMQDestination.createDestination(DESTINATION, ActiveMQDestination.QUEUE_TYPE),}); - broker.addConnector(BROKER_URL); - broker.start(); - broker.waitUntilStarted(); - - //some internal api we don't implement - connectionUri = broker.getDefaultUri(); - } - - @Override - public void tearDown() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - public void testInputStreamSetSyncSendOption() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(DESTINATION + "?producer.alwaysSyncSend=true"); - - OutputStream out = null; - try { - out = connection.createOutputStream(destination); - - assertTrue(((ActiveMQOutputStream) out).isAlwaysSyncSend()); - - LOG.debug("writing..."); - for (int i = 0; i < STREAM_LENGTH; ++i) { - out.write(0); - } - LOG.debug("wrote " + STREAM_LENGTH + " bytes"); - } - finally { - if (out != null) { - out.close(); - } - } - - InputStream in = null; - try { - in = connection.createInputStream(destination); - LOG.debug("reading..."); - int count = 0; - while (-1 != in.read()) { - ++count; - } - LOG.debug("read " + count + " bytes"); - } - finally { - if (in != null) { - in.close(); - } - } - - connection.close(); - } - - public void testInputStreamMatchesDefaultChuckSize() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(DESTINATION); - - OutputStream out = null; - try { - out = connection.createOutputStream(destination); - LOG.debug("writing..."); - for (int i = 0; i < STREAM_LENGTH; ++i) { - out.write(0); - } - LOG.debug("wrote " + STREAM_LENGTH + " bytes"); - } - finally { - if (out != null) { - out.close(); - } - } - - InputStream in = null; - try { - in = connection.createInputStream(destination); - LOG.debug("reading..."); - int count = 0; - while (-1 != in.read()) { - ++count; - } - LOG.debug("read " + count + " bytes"); - } - finally { - if (in != null) { - in.close(); - } - } - - connection.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java new file mode 100644 index 0000000..f47620f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.TestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Enforces a test case to run for only an allotted time to prevent them from + * hanging and breaking the whole testing. + * + * + */ + +public abstract class AutoFailTestSupport extends TestCase { + public static final int EXIT_SUCCESS = 0; + public static final int EXIT_ERROR = 1; + private static final Logger LOG = LoggerFactory.getLogger(AutoFailTestSupport.class); + + private long maxTestTime = 5 * 60 * 1000; // 5 mins by default + private Thread autoFailThread; + + private boolean verbose = true; + private boolean useAutoFail; // Disable auto fail by default + private AtomicBoolean isTestSuccess; + + protected void setUp() throws Exception { + // Runs the auto fail thread before performing any setup + if (isAutoFail()) { + startAutoFailThread(); + } + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + + // Stops the auto fail thread only after performing any clean up + stopAutoFailThread(); + } + + /** + * Manually start the auto fail thread. To start it automatically, just set + * the auto fail to true before calling any setup methods. As a rule, this + * method is used only when you are not sure, if the setUp and tearDown + * method is propagated correctly. + */ + public void startAutoFailThread() { + setAutoFail(true); + isTestSuccess = new AtomicBoolean(false); + autoFailThread = new Thread(new Runnable() { + public void run() { + try { + // Wait for test to finish succesfully + Thread.sleep(getMaxTestTime()); + } catch (InterruptedException e) { + // This usually means the test was successful + } finally { + // Check if the test was able to tear down succesfully, + // which usually means, it has finished its run. + if (!isTestSuccess.get()) { + LOG.error("Test case has exceeded the maximum allotted time to run of: " + getMaxTestTime() + " ms."); + dumpAllThreads(getName()); + if (System.getProperty("org.apache.activemq.AutoFailTestSupport.disableSystemExit") == null) { + System.exit(EXIT_ERROR); + } else { + LOG.error("No system.exit as it kills surefire - forkedProcessTimeoutInSeconds (surefire.timeout) will kick in eventually see pom.xml surefire plugin config"); + } + } + } + } + }, "AutoFailThread"); + + if (verbose) { + LOG.info("Starting auto fail thread..."); + } + + LOG.info("Starting auto fail thread..."); + autoFailThread.start(); + } + + /** + * Manually stops the auto fail thread. As a rule, this method is used only + * when you are not sure, if the setUp and tearDown method is propagated + * correctly. + */ + public void stopAutoFailThread() { + if (isAutoFail() && autoFailThread != null && autoFailThread.isAlive()) { + isTestSuccess.set(true); + + if (verbose) { + LOG.info("Stopping auto fail thread..."); + } + + LOG.info("Stopping auto fail thread..."); + autoFailThread.interrupt(); + } + } + + /** + * Sets the auto fail value. As a rule, this should be used only before any + * setup methods is called to automatically enable the auto fail thread in + * the setup method of the test case. + * + * @param val + */ + public void setAutoFail(boolean val) { + this.useAutoFail = val; + } + + public boolean isAutoFail() { + return this.useAutoFail; + } + + /** + * The assigned value will only be reflected when the auto fail thread has + * started its run. Value is in milliseconds. + * + * @param val + */ + public void setMaxTestTime(long val) { + this.maxTestTime = val; + } + + public long getMaxTestTime() { + return this.maxTestTime; + } + + + public static void dumpAllThreads(String prefix) { + Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces(); + for (Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) { + System.err.println(prefix + " " + stackEntry.getKey()); + for(StackTraceElement element : stackEntry.getValue()) { + System.err.println(" " + element); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java index 5e5b993..b8397e2 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java @@ -52,22 +52,50 @@ public class ConnectionCleanupTest extends TestCase { try { connection.setClientID("test"); - // fail("Should have received JMSException"); + fail("Should have received JMSException"); } catch (JMSException e) { } - connection.cleanup(); + connection.doCleanup(true); connection.setClientID("test"); connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try { connection.setClientID("test"); - // fail("Should have received JMSException"); + fail("Should have received JMSException"); } catch (JMSException e) { } } + public void testChangeClientIDDenied() throws JMSException { + + connection.setClientID("test"); + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + connection.setClientID("test"); + fail("Should have received JMSException"); + } catch (JMSException e) { + } + + connection.cleanup(); + + try { + connection.setClientID("test"); + fail("Should have received JMSException"); + } catch (JMSException e) { + } + + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + connection.setClientID("test"); + fail("Should have received JMSException"); + } catch (JMSException e) { + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java index fa58ebe..b8dea70 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java @@ -16,15 +16,23 @@ */ package org.apache.activemq; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.junit.rules.TemporaryFolder; import org.springframework.jms.core.JmsTemplate; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import java.io.File; /** * A useful base class which creates and closes an embedded broker @@ -32,17 +40,26 @@ import javax.jms.Destination; public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { protected BrokerService broker; - // protected String bindAddress = "tcp://localhost:61616"; - protected String bindAddress = "vm://localhost"; + protected EmbeddedJMS artemisBroker; + protected String bindAddress = "tcp://localhost:61616"; protected ConnectionFactory connectionFactory; protected boolean useTopic; protected ActiveMQDestination destination; protected JmsTemplate template; - @Override + public TemporaryFolder temporaryFolder; + + public String CLUSTER_PASSWORD = "OPENWIRECLUSTER"; + protected void setUp() throws Exception { - if (broker == null) { - broker = createBroker(); + BrokerService.disableWrapper = true; + File tmpRoot = new File("./target/tmp"); + tmpRoot.mkdirs(); + temporaryFolder = new TemporaryFolder(tmpRoot); + temporaryFolder.create(); + + if (artemisBroker == null) { + artemisBroker = createArtemisBroker(); } startBroker(); @@ -58,13 +75,42 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { @Override protected void tearDown() throws Exception { - if (broker != null) { + if (artemisBroker != null) { try { - broker.stop(); + artemisBroker.stop(); } catch (Exception e) { } } + temporaryFolder.delete(); + } + + public String getTmp() { + return getTmpFile().getAbsolutePath(); + } + + public File getTmpFile() { + return temporaryFolder.getRoot(); + } + + protected String getJournalDir(int serverID, boolean backup) { + return getTmp() + "/journal_" + serverID + "_" + backup; + } + + protected String getBindingsDir(int serverID, boolean backup) { + return getTmp() + "/binding_" + serverID + "_" + backup; + } + + protected String getPageDir(int serverID, boolean backup) { + return getTmp() + "/paging_" + serverID + "_" + backup; + } + + protected String getLargeMessagesDir(int serverID, boolean backup) { + return getTmp() + "/paging_" + serverID + "_" + backup; + } + + protected static String newURI(String localhostAddress, int serverID) { + return "tcp://" + localhostAddress + ":" + (61616 + serverID); } /** @@ -114,20 +160,44 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { return new ActiveMQConnectionFactory(bindAddress); } - /** - * Factory method to create a new broker - * - * @throws Exception - */ + + public EmbeddedJMS createArtemisBroker() throws Exception { + Configuration config0 = createConfig("localhost", 0); + EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + return newbroker; + } + + protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception { + ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). + setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO). + setJournalDirectory(getJournalDir(serverID, false)). + setBindingsDirectory(getBindingsDir(serverID, false)). + setPagingDirectory(getPageDir(serverID, false)). + setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)). + setJournalCompactMinFiles(0). + setJournalCompactPercentage(0). + setClusterPassword(CLUSTER_PASSWORD); + + configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); + + configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID)); + configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID)); + + return configuration; + } + + //we keep this because some other tests uses it. + //we'll delete this when those tests are dealt with. protected BrokerService createBroker() throws Exception { BrokerService answer = new BrokerService(); answer.setPersistent(isPersistent()); + answer.getManagementContext().setCreateConnector(false); answer.addConnector(bindAddress); return answer; } protected void startBroker() throws Exception { - broker.start(); + artemisBroker.start(); } /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java new file mode 100755 index 0000000..b7c2e94 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.ArrayList; +import java.util.Enumeration; + +import org.apache.activemq.test.JmsResourceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsQueueTransactionTest extends JmsTransactionTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(JmsQueueTransactionTest.class); + + /** + * @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider() + */ + protected JmsResourceProvider getJmsResourceProvider() { + JmsResourceProvider p = new JmsResourceProvider(); + p.setTopic(false); + return p; + } + + /** + * Tests if the the connection gets reset, the messages will still be + * received. + * + * @throws Exception + */ + public void testReceiveTwoThenCloseConnection() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from previous test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + ArrayList<Message> messages = new ArrayList<Message>(); + beginTx(); + Message message = consumer.receive(2000); + assertEquals(outbound[0], message); + + message = consumer.receive(2000); + assertNotNull(message); + assertEquals(outbound[1], message); + + // Close and reopen connection. + reconnect(); + + // Consume again.. the previous message should + // get redelivered. + beginTx(); + message = consumer.receive(2000); + assertNotNull("Should have re-received the first message again!", message); + messages.add(message); + assertEquals(outbound[0], message); + + message = consumer.receive(5000); + assertNotNull("Should have re-received the second message again!", message); + messages.add(message); + assertEquals(outbound[1], message); + commitTx(); + + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + + assertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + /** + * Tests sending and receiving messages with two sessions(one for producing + * and another for consuming). + * + * @throws Exception + */ + public void testSendReceiveInSeperateSessionTest() throws Exception { + session.close(); + int batchCount = 10; + + for (int i = 0; i < batchCount; i++) { + // Session that sends messages + { + Session session = resourceProvider.createSession(connection); + this.session = session; + MessageProducer producer = resourceProvider.createProducer(session, destination); + // consumer = resourceProvider.createConsumer(session, + // destination); + beginTx(); + producer.send(session.createTextMessage("Test Message: " + i)); + commitTx(); + session.close(); + } + + // Session that consumes messages + { + Session session = resourceProvider.createSession(connection); + this.session = session; + MessageConsumer consumer = resourceProvider.createConsumer(session, destination); + + beginTx(); + TextMessage message = (TextMessage)consumer.receive(1000 * 5); + assertNotNull("Received only " + i + " messages in batch ", message); + assertEquals("Test Message: " + i, message.getText()); + + commitTx(); + session.close(); + } + } + } + + /** + * Tests the queue browser. Browses the messages then the consumer tries to + * receive them. The messages should still be in the queue even when it was + * browsed. + * + * @throws Exception + */ + public void testReceiveBrowseReceive() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")}; + + // lets consume any outstanding messages from previous test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + producer.send(outbound[2]); + commitTx(); + + // Get the first. + beginTx(); + assertEquals(outbound[0], consumer.receive(1000)); + consumer.close(); + commitTx(); + + beginTx(); + QueueBrowser browser = session.createBrowser((Queue)destination); + Enumeration enumeration = browser.getEnumeration(); + + // browse the second + assertTrue("should have received the second message", enumeration.hasMoreElements()); + assertEquals(outbound[1], (Message)enumeration.nextElement()); + + // browse the third. + assertTrue("Should have received the third message", enumeration.hasMoreElements()); + assertEquals(outbound[2], (Message)enumeration.nextElement()); + + LOG.info("Check for more..."); + // There should be no more. + boolean tooMany = false; + while (enumeration.hasMoreElements()) { + LOG.info("Got extra message: " + ((TextMessage)enumeration.nextElement()).getText()); + tooMany = true; + } + assertFalse(tooMany); + LOG.info("close browser..."); + browser.close(); + + LOG.info("reopen and consume..."); + // Re-open the consumer. + consumer = resourceProvider.createConsumer(session, destination); + // Receive the second. + assertEquals(outbound[1], consumer.receive(1000)); + // Receive the third. + assertEquals(outbound[2], consumer.receive(1000)); + consumer.close(); + + commitTx(); + } + + public void testCloseConsumer() throws Exception { + Destination dest = session.createQueue(getSubject() + "?consumer.prefetchSize=0"); + producer = session.createProducer(dest); + beginTx(); + producer.send(session.createTextMessage("message 1")); + producer.send(session.createTextMessage("message 2")); + commitTx(); + + beginTx(); + consumer = session.createConsumer(dest); + Message message1 = consumer.receive(1000); + String text1 = ((TextMessage)message1).getText(); + assertNotNull(message1); + assertEquals("message 1", text1); + + consumer.close(); + + consumer = session.createConsumer(dest); + + Message message2 = consumer.receive(1000); + String text2 = ((TextMessage)message2).getText(); + assertNotNull(message2); + assertEquals("message 2", text2); + commitTx(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java new file mode 100755 index 0000000..dfcf302 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java @@ -0,0 +1,721 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.test.JmsResourceProvider; +import org.apache.activemq.test.TestSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public abstract class JmsTransactionTestSupport extends TestSupport implements MessageListener { + + private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class); + private static final int MESSAGE_COUNT = 5; + private static final String MESSAGE_TEXT = "message"; + + protected ConnectionFactory connectionFactory; + protected Connection connection; + protected Session session; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected JmsResourceProvider resourceProvider; + protected Destination destination; + protected int batchCount = 10; + protected int batchSize = 20; + protected BrokerService broker; + + // for message listener test + private final List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT); + private final List<Message> ackMessages = new ArrayList<Message>(MESSAGE_COUNT); + private boolean resendPhase; + + public JmsTransactionTestSupport() { + super(); + } + + public JmsTransactionTestSupport(String name) { + super(name); + } + + /* + * (non-Javadoc) + * + * @see junit.framework.TestCase#setUp() + */ + @Override + protected void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + resourceProvider = getJmsResourceProvider(); + topic = resourceProvider.isTopic(); + // We will be using transacted sessions. + setSessionTransacted(); + connectionFactory = newConnectionFactory(); + reconnect(); + } + + protected void setSessionTransacted() { + resourceProvider.setTransacted(true); + } + + protected ConnectionFactory newConnectionFactory() throws Exception { + return resourceProvider.createConnectionFactory(); + } + + protected void beginTx() throws Exception { + //no-op for local tx + } + + protected void commitTx() throws Exception { + session.commit(); + } + + protected void rollbackTx() throws Exception { + session.rollback(); + } + + /** + */ + protected BrokerService createBroker() throws Exception, URISyntaxException { + return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false")); + } + + /* + * (non-Javadoc) + * + * @see junit.framework.TestCase#tearDown() + */ + @Override + protected void tearDown() throws Exception { + LOG.info("Closing down connection"); + + try { + session.close(); + session = null; + connection.close(); + connection = null; + } catch (Exception e) { + LOG.info("Caught exception while closing resources."); + } + + try { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } catch (Exception e) { + LOG.info("Caught exception while shutting down the Broker", e); + } + + LOG.info("Connection closed."); + } + + protected abstract JmsResourceProvider getJmsResourceProvider(); + + /** + * Sends a batch of messages and validates that the messages are received. + * + * @throws Exception + */ + public void testSendReceiveTransactedBatches() throws Exception { + + TextMessage message = session.createTextMessage("Batch Message"); + for (int j = 0; j < batchCount; j++) { + LOG.info("Producing bacth " + j + " of " + batchSize + " messages"); + + beginTx(); + for (int i = 0; i < batchSize; i++) { + producer.send(message); + } + messageSent(); + commitTx(); + LOG.info("Consuming bacth " + j + " of " + batchSize + " messages"); + + beginTx(); + for (int i = 0; i < batchSize; i++) { + message = (TextMessage)consumer.receive(1000 * 5); + assertNotNull("Received only " + i + " messages in batch " + j, message); + assertEquals("Batch Message", message.getText()); + } + + commitTx(); + } + } + + protected void messageSent() throws Exception { + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * not consumed. + * + * @throws Exception + */ + public void testSendRollback() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + commitTx(); + + // sends a message that gets rollbacked + beginTx(); + producer.send(session.createTextMessage("I'm going to get rolled back.")); + rollbackTx(); + + // sends a message + beginTx(); + producer.send(outbound[1]); + commitTx(); + + // receives the first message + beginTx(); + ArrayList<Message> messages = new ArrayList<Message>(); + LOG.info("About to consume message 1"); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // receives the second message + LOG.info("About to consume message 2"); + message = consumer.receive(4000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + /** + * spec section 3.6 acking a message with automation acks has no effect. + * @throws Exception + */ + public void testAckMessageInTx() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + outbound[0].acknowledge(); + commitTx(); + outbound[0].acknowledge(); + + // receives the first message + beginTx(); + ArrayList<Message> messages = new ArrayList<Message>(); + LOG.info("About to consume message 1"); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Message not delivered.", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the message sent before + * session close is not consumed. + * + * This test only works with local transactions, not xa. + * @throws Exception + */ + public void testSendSessionClose() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + commitTx(); + + // sends a message that gets rollbacked + beginTx(); + producer.send(session.createTextMessage("I'm going to get rolled back.")); + consumer.close(); + + reconnectSession(); + + // sends a message + producer.send(outbound[1]); + commitTx(); + + // receives the first message + ArrayList<Message> messages = new ArrayList<Message>(); + LOG.info("About to consume message 1"); + beginTx(); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // receives the second message + LOG.info("About to consume message 2"); + message = consumer.receive(4000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the message sent before + * session close is not consumed. + * + * @throws Exception + */ + public void testSendSessionAndConnectionClose() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // sends a message + beginTx(); + producer.send(outbound[0]); + commitTx(); + + // sends a message that gets rollbacked + beginTx(); + producer.send(session.createTextMessage("I'm going to get rolled back.")); + consumer.close(); + session.close(); + + reconnect(); + + // sends a message + beginTx(); + producer.send(outbound[1]); + commitTx(); + + // receives the first message + ArrayList<Message> messages = new ArrayList<Message>(); + LOG.info("About to consume message 1"); + beginTx(); + Message message = consumer.receive(1000); + messages.add(message); + LOG.info("Received: " + message); + + // receives the second message + LOG.info("About to consume message 2"); + message = consumer.receive(4000); + messages.add(message); + LOG.info("Received: " + message); + + // validates that the rollbacked was not consumed + commitTx(); + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * redelivered. + * + * @throws Exception + */ + public void testReceiveRollback() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from prev test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + // sent both messages + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + ArrayList<Message> messages = new ArrayList<Message>(); + beginTx(); + Message message = consumer.receive(1000); + messages.add(message); + assertEquals(outbound[0], message); + commitTx(); + + // rollback so we can get that last message again. + beginTx(); + message = consumer.receive(1000); + assertNotNull(message); + assertEquals(outbound[1], message); + rollbackTx(); + + // Consume again.. the prev message should + // get redelivered. + beginTx(); + message = consumer.receive(5000); + assertNotNull("Should have re-received the message again!", message); + messages.add(message); + commitTx(); + + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * redelivered. + * + * @throws Exception + */ + public void testReceiveTwoThenRollback() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from prev test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + // + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + ArrayList<Message> messages = new ArrayList<Message>(); + beginTx(); + Message message = consumer.receive(1000); + assertEquals(outbound[0], message); + + message = consumer.receive(1000); + assertNotNull(message); + assertEquals(outbound[1], message); + rollbackTx(); + + // Consume again.. the prev message should + // get redelivered. + beginTx(); + message = consumer.receive(5000); + assertNotNull("Should have re-received the first message again!", message); + messages.add(message); + assertEquals(outbound[0], message); + message = consumer.receive(5000); + assertNotNull("Should have re-received the second message again!", message); + messages.add(message); + assertEquals(outbound[1], message); + + assertNull(consumer.receiveNoWait()); + commitTx(); + + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * not consumed. + * + * @throws Exception + */ + public void testSendReceiveWithPrefetchOne() throws Exception { + setPrefetchToOne(); + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message"), + session.createTextMessage("Fourth Message")}; + + beginTx(); + for (int i = 0; i < outbound.length; i++) { + // sends a message + producer.send(outbound[i]); + } + commitTx(); + + // receives the first message + beginTx(); + for (int i = 0; i < outbound.length; i++) { + LOG.info("About to consume message 1"); + Message message = consumer.receive(1000); + assertNotNull(message); + LOG.info("Received: " + message); + } + + // validates that the rollbacked was not consumed + commitTx(); + } + + /** + * Perform the test that validates if the rollbacked message was redelivered + * multiple times. + * + * @throws Exception + */ + public void testReceiveTwoThenRollbackManyTimes() throws Exception { + for (int i = 0; i < 5; i++) { + testReceiveTwoThenRollback(); + } + } + + /** + * Sends a batch of messages and validates that the rollbacked message was + * not consumed. This test differs by setting the message prefetch to one. + * + * @throws Exception + */ + public void testSendRollbackWithPrefetchOfOne() throws Exception { + setPrefetchToOne(); + testSendRollback(); + } + + /** + * Sends a batch of messages and and validates that the rollbacked message + * was redelivered. This test differs by setting the message prefetch to + * one. + * + * @throws Exception + */ + public void testReceiveRollbackWithPrefetchOfOne() throws Exception { + setPrefetchToOne(); + testReceiveRollback(); + } + + /** + * Tests if the messages can still be received if the consumer is closed + * (session is not closed). + * + * @throws Exception see http://jira.codehaus.org/browse/AMQ-143 + */ + public void testCloseConsumerBeforeCommit() throws Exception { + TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from prev test runs + beginTx(); + while (consumer.receiveNoWait() != null) { + } + + commitTx(); + + // sends the messages + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + beginTx(); + TextMessage message = (TextMessage)consumer.receive(1000); + assertEquals(outbound[0].getText(), message.getText()); + // Close the consumer before the commit. This should not cause the + // received message + // to rollback. + consumer.close(); + commitTx(); + + // Create a new consumer + consumer = resourceProvider.createConsumer(session, destination); + LOG.info("Created consumer: " + consumer); + + beginTx(); + message = (TextMessage)consumer.receive(1000); + assertEquals(outbound[1].getText(), message.getText()); + commitTx(); + } + + public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception { + ArrayList<String> list = new ArrayList<String>(); + list.add("First"); + Message outbound = session.createObjectMessage(list); + outbound.setStringProperty("foo", "abc"); + + beginTx(); + producer.send(outbound); + commitTx(); + + LOG.info("About to consume message 1"); + beginTx(); + Message message = consumer.receive(5000); + + List<String> body = assertReceivedObjectMessageWithListBody(message); + + // now lets try mutate it + try { + message.setStringProperty("foo", "def"); + fail("Cannot change properties of the object!"); + } catch (JMSException e) { + LOG.info("Caught expected exception: " + e, e); + } + body.clear(); + body.add("This should never be seen!"); + rollbackTx(); + + beginTx(); + message = consumer.receive(5000); + List<String> secondBody = assertReceivedObjectMessageWithListBody(message); + assertNotSame("Second call should return a different body", secondBody, body); + commitTx(); + } + + @SuppressWarnings("unchecked") + protected List<String> assertReceivedObjectMessageWithListBody(Message message) throws JMSException { + assertNotNull("Should have received a message!", message); + assertEquals("foo header", "abc", message.getStringProperty("foo")); + + assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage); + ObjectMessage objectMessage = (ObjectMessage)message; + List<String> body = (List<String>)objectMessage.getObject(); + LOG.info("Received body: " + body); + + assertEquals("Size of list should be 1", 1, body.size()); + assertEquals("element 0 of list", "First", body.get(0)); + return body; + } + + /** + * Recreates the connection. + * + * @throws javax.jms.JMSException + */ + protected void reconnect() throws Exception { + + if (connection != null) { + // Close the prev connection. + connection.close(); + } + session = null; + connection = resourceProvider.createConnection(connectionFactory); + reconnectSession(); + connection.start(); + } + + /** + * Recreates the connection. + * + * @throws javax.jms.JMSException + */ + protected void reconnectSession() throws JMSException { + if (session != null) { + session.close(); + } + + session = resourceProvider.createSession(connection); + destination = resourceProvider.createDestination(session, getSubject()); + producer = resourceProvider.createProducer(session, destination); + consumer = resourceProvider.createConsumer(session, destination); + } + + /** + * Sets the prefeftch policy to one. + */ + protected void setPrefetchToOne() { + ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy(); + prefetchPolicy.setQueuePrefetch(1); + prefetchPolicy.setTopicPrefetch(1); + prefetchPolicy.setDurableTopicPrefetch(1); + prefetchPolicy.setOptimizeDurableTopicPrefetch(1); + } + + protected ActiveMQPrefetchPolicy getPrefetchPolicy() { + return ((ActiveMQConnection)connection).getPrefetchPolicy(); + } + + //This test won't work with xa tx so no beginTx() has been added. + public void testMessageListener() throws Exception { + // send messages + for (int i = 0; i < MESSAGE_COUNT; i++) { + producer.send(session.createTextMessage(MESSAGE_TEXT + i)); + } + commitTx(); + consumer.setMessageListener(this); + // wait receive + waitReceiveUnack(); + assertEquals(unackMessages.size(), MESSAGE_COUNT); + // resend phase + waitReceiveAck(); + assertEquals(ackMessages.size(), MESSAGE_COUNT); + // should no longer re-receive + consumer.setMessageListener(null); + assertNull(consumer.receive(500)); + reconnect(); + } + + @Override + public void onMessage(Message message) { + if (!resendPhase) { + unackMessages.add(message); + if (unackMessages.size() == MESSAGE_COUNT) { + try { + rollbackTx(); + resendPhase = true; + } catch (Exception e) { + e.printStackTrace(); + } + } + } else { + ackMessages.add(message); + if (ackMessages.size() == MESSAGE_COUNT) { + try { + commitTx(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + private void waitReceiveUnack() throws Exception { + for (int i = 0; i < 100 && !resendPhase; i++) { + Thread.sleep(100); + } + assertTrue(resendPhase); + } + + private void waitReceiveAck() throws Exception { + for (int i = 0; i < 100 && ackMessages.size() < MESSAGE_COUNT; i++) { + Thread.sleep(100); + } + assertFalse(ackMessages.size() < MESSAGE_COUNT); + } +}