Repository: activemq-artemis Updated Branches: refs/heads/master 25fc24dfa -> 029c11263
ARTEMIS-210 outbound RA connection load-balancing Inbound sessions are always created from the same ActiveMQConnectionFactory which means the load-balancing policy is applied to them in the expected manner. However, outbound sessions are created from independent, unique ActiveMQConnectionFactory instances which means that the load-balancing doesn't follow the expected pattern. This commit changes this behavior by caching each unique ActiveMQConnectionFactory instance and using it for both inbound and outbound sessions potentially. This ensures the sessions are load-balanced as expected. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7a1199c4 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7a1199c4 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7a1199c4 Branch: refs/heads/master Commit: 7a1199c4759a8b56ccb28e96f44b3ba760ca5316 Parents: 25fc24d Author: jbertram <jbert...@apache.org> Authored: Mon Aug 24 17:14:53 2015 -0500 Committer: jbertram <jbert...@apache.org> Committed: Mon Aug 24 18:24:04 2015 -0500 ---------------------------------------------------------------------- .../core/client/impl/ServerLocatorImpl.java | 10 + .../artemis/ra/ActiveMQRAManagedConnection.java | 3 + .../artemis/ra/ActiveMQResourceAdapter.java | 190 ++++++----- .../artemis/ra/ConnectionFactoryProperties.java | 313 +++++++++++++++++++ .../integration/ra/ActiveMQClusteredTest.java | 69 +++- .../integration/ra/OutgoingConnectionTest.java | 52 +++ .../ra/ConnectionFactoryPropertiesTest.java | 86 +++++ 7 files changed, 638 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a1199c4/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 708c087..deb17da 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -570,6 +570,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery // if the topologyArray is null, we will use the initialConnectors if (usedTopology != null) { + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { + ActiveMQClientLogger.LOGGER.trace("Selecting connector from toplogy."); + } int pos = loadBalancingPolicy.select(usedTopology.length); Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos]; @@ -577,6 +580,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } else { // Get from initialconnectors + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { + ActiveMQClientLogger.LOGGER.trace("Selecting connector from initial connectors."); + } int pos = loadBalancingPolicy.select(initialConnectors.length); @@ -1753,4 +1759,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery ServerLocatorImpl clone = new ServerLocatorImpl(this); return clone; } + + public boolean isReceivedToplogy() { + return receivedTopology; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a1199c4/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java index 010467e..dc84ace 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java @@ -807,4 +807,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc this.inManagedTx = inManagedTx; } + public ActiveMQConnectionFactory getConnectionFactory() { + return connectionFactory; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a1199c4/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java index 8768265..44adac8 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java @@ -122,6 +122,13 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { private String entries; /** + * Keep track of the connection factories that we create so we don't create a bunch of instances of factories + * configured the exact same way. Using the same connection factory instance also makes connection load-balancing + * behave as expected for outbound connections. + */ + private final Map<ConnectionFactoryProperties, ActiveMQConnectionFactory> knownConnectionFactories = new HashMap<ConnectionFactoryProperties, ActiveMQConnectionFactory>(); + + /** * Constructor */ public ActiveMQResourceAdapter() { @@ -267,6 +274,8 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { managedConnectionFactories.clear(); + knownConnectionFactories.clear(); + if (defaultActiveMQConnectionFactory != null) { defaultActiveMQConnectionFactory.close(); } @@ -1586,107 +1595,124 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable { public ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) { ActiveMQConnectionFactory cf; - List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); + boolean known = false; - String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); + synchronized (knownConnectionFactories) { + if (!knownConnectionFactories.keySet().contains(overrideProperties)) { + List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); - Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); + String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress(); - String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); + Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); - String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); + String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile(); - String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); + String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName(); - if (ha == null) { - ha = ActiveMQClient.DEFAULT_IS_HA; - } + String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass(); - if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { - BroadcastEndpointFactory endpointFactory = null; - - if (jgroupsLocatorClassName != null) { - String jchannelRefName = raProperties.getJgroupsChannelRefName(); - JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName); - endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel); - } - else if (discoveryAddress != null) { - Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort(); - if (discoveryPort == null) { - discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT; + if (ha == null) { + ha = ActiveMQClient.DEFAULT_IS_HA; } - String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); - endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); - } - else if (jgroupsFileName != null) { - endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName); - } - Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); - if (refreshTimeout == null) { - refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; - } - - Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); - - if (initialTimeout == null) { - initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; - } - - DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); - - if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); - } - - if (ha) { - cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF); - } - else { - cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF); - } - } - else if (connectorClassName != null) { - TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()]; - - List<Map<String, Object>> connectionParams; - if (overrideProperties.getParsedConnectorClassNames() != null) { - connectionParams = overrideProperties.getParsedConnectionParameters(); - } - else { - connectionParams = raProperties.getParsedConnectionParameters(); - } - - for (int i = 0; i < connectorClassName.size(); i++) { - TransportConfiguration tc; - if (connectionParams == null || i >= connectionParams.size()) { - tc = new TransportConfiguration(connectorClassName.get(i)); - ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); + if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { + BroadcastEndpointFactory endpointFactory = null; + + if (jgroupsLocatorClassName != null) { + String jchannelRefName = raProperties.getJgroupsChannelRefName(); + JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName); + endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel); + } + else if (discoveryAddress != null) { + Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort(); + if (discoveryPort == null) { + discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT; + } + + String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); + endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1); + } + else if (jgroupsFileName != null) { + endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName); + } + Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); + if (refreshTimeout == null) { + refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; + } + + Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout(); + + if (initialTimeout == null) { + initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; + } + + DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory); + + if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { + ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha); + } + + if (ha) { + cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF); + } + else { + cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF); + } + } + else if (connectorClassName != null) { + TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()]; + + List<Map<String, Object>> connectionParams; + if (overrideProperties.getParsedConnectorClassNames() != null) { + connectionParams = overrideProperties.getParsedConnectionParameters(); + } + else { + connectionParams = raProperties.getParsedConnectionParameters(); + } + + for (int i = 0; i < connectorClassName.size(); i++) { + TransportConfiguration tc; + if (connectionParams == null || i >= connectionParams.size()) { + tc = new TransportConfiguration(connectorClassName.get(i)); + ActiveMQRALogger.LOGGER.debug("No connector params provided using default"); + } + else { + tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); + } + + transportConfigurations[i] = tc; + } + + if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { + ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" + + Arrays.toString(transportConfigurations) + " with ha=" + ha); + } + + if (ha) { + cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations); + } + else { + cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); + } } else { - tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i)); + throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); } - transportConfigurations[i] = tc; - } - - if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" + - Arrays.toString(transportConfigurations) + " with ha=" + ha); - } - - if (ha) { - cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations); + setParams(cf, overrideProperties); + knownConnectionFactories.put(overrideProperties, cf); } else { - cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations); + cf = knownConnectionFactories.get(overrideProperties); + known = true; } } - else { - throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); + + if (known && cf.getServerLocator().isClosed()) { + knownConnectionFactories.remove(overrideProperties); + cf = createActiveMQConnectionFactory(overrideProperties); } - setParams(cf, overrideProperties); return cf; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a1199c4/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java index 770dd3a..9edd1d8 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ConnectionFactoryProperties.java @@ -682,4 +682,317 @@ public class ConnectionFactoryProperties { public boolean isHasBeenUpdated() { return hasBeenUpdated; } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ConnectionFactoryProperties other = (ConnectionFactoryProperties) obj; + if (this.autoGroup == null) { + if (other.autoGroup != null) + return false; + } + else if (!this.autoGroup.equals(other.autoGroup)) + return false; + if (this.blockOnAcknowledge == null) { + if (other.blockOnAcknowledge != null) + return false; + } + else if (!this.blockOnAcknowledge.equals(other.blockOnAcknowledge)) + return false; + if (this.blockOnDurableSend == null) { + if (other.blockOnDurableSend != null) + return false; + } + else if (!this.blockOnDurableSend.equals(other.blockOnDurableSend)) + return false; + if (this.blockOnNonDurableSend == null) { + if (other.blockOnNonDurableSend != null) + return false; + } + else if (!this.blockOnNonDurableSend.equals(other.blockOnNonDurableSend)) + return false; + if (this.cacheLargeMessagesClient == null) { + if (other.cacheLargeMessagesClient != null) + return false; + } + else if (!this.cacheLargeMessagesClient.equals(other.cacheLargeMessagesClient)) + return false; + if (this.compressLargeMessage == null) { + if (other.compressLargeMessage != null) + return false; + } + else if (!this.compressLargeMessage.equals(other.compressLargeMessage)) + return false; + if (this.failoverOnInitialConnection == null) { + if (other.failoverOnInitialConnection != null) + return false; + } + else if (!this.failoverOnInitialConnection.equals(other.failoverOnInitialConnection)) + return false; + if (this.ha == null) { + if (other.ha != null) + return false; + } + else if (!this.ha.equals(other.ha)) + return false; + if (this.preAcknowledge == null) { + if (other.preAcknowledge != null) + return false; + } + else if (!this.preAcknowledge.equals(other.preAcknowledge)) + return false; + if (this.callFailoverTimeout == null) { + if (other.callFailoverTimeout != null) + return false; + } + else if (!this.callFailoverTimeout.equals(other.callFailoverTimeout)) + return false; + if (this.callTimeout == null) { + if (other.callTimeout != null) + return false; + } + else if (!this.callTimeout.equals(other.callTimeout)) + return false; + if (this.clientFailureCheckPeriod == null) { + if (other.clientFailureCheckPeriod != null) + return false; + } + else if (!this.clientFailureCheckPeriod.equals(other.clientFailureCheckPeriod)) + return false; + if (this.clientID == null) { + if (other.clientID != null) + return false; + } + else if (!this.clientID.equals(other.clientID)) + return false; + if (this.confirmationWindowSize == null) { + if (other.confirmationWindowSize != null) + return false; + } + else if (!this.confirmationWindowSize.equals(other.confirmationWindowSize)) + return false; + if (this.connectionLoadBalancingPolicyClassName == null) { + if (other.connectionLoadBalancingPolicyClassName != null) + return false; + } + else if (!this.connectionLoadBalancingPolicyClassName.equals(other.connectionLoadBalancingPolicyClassName)) + return false; + if (this.connectionTTL == null) { + if (other.connectionTTL != null) + return false; + } + else if (!this.connectionTTL.equals(other.connectionTTL)) + return false; + if (this.consumerMaxRate == null) { + if (other.consumerMaxRate != null) + return false; + } + else if (!this.consumerMaxRate.equals(other.consumerMaxRate)) + return false; + if (this.consumerWindowSize == null) { + if (other.consumerWindowSize != null) + return false; + } + else if (!this.consumerWindowSize.equals(other.consumerWindowSize)) + return false; + if (this.discoveryAddress == null) { + if (other.discoveryAddress != null) + return false; + } + else if (!this.discoveryAddress.equals(other.discoveryAddress)) + return false; + if (this.discoveryInitialWaitTimeout == null) { + if (other.discoveryInitialWaitTimeout != null) + return false; + } + else if (!this.discoveryInitialWaitTimeout.equals(other.discoveryInitialWaitTimeout)) + return false; + if (this.discoveryLocalBindAddress == null) { + if (other.discoveryLocalBindAddress != null) + return false; + } + else if (!this.discoveryLocalBindAddress.equals(other.discoveryLocalBindAddress)) + return false; + if (this.discoveryPort == null) { + if (other.discoveryPort != null) + return false; + } + else if (!this.discoveryPort.equals(other.discoveryPort)) + return false; + if (this.discoveryRefreshTimeout == null) { + if (other.discoveryRefreshTimeout != null) + return false; + } + else if (!this.discoveryRefreshTimeout.equals(other.discoveryRefreshTimeout)) + return false; + if (this.dupsOKBatchSize == null) { + if (other.dupsOKBatchSize != null) + return false; + } + else if (!this.dupsOKBatchSize.equals(other.dupsOKBatchSize)) + return false; + if (this.groupID == null) { + if (other.groupID != null) + return false; + } + else if (!this.groupID.equals(other.groupID)) + return false; + if (this.initialConnectAttempts == null) { + if (other.initialConnectAttempts != null) + return false; + } + else if (!this.initialConnectAttempts.equals(other.initialConnectAttempts)) + return false; + if (this.initialMessagePacketSize == null) { + if (other.initialMessagePacketSize != null) + return false; + } + else if (!this.initialMessagePacketSize.equals(other.initialMessagePacketSize)) + return false; + if (this.jgroupsChannelName == null) { + if (other.jgroupsChannelName != null) + return false; + } + else if (!this.jgroupsChannelName.equals(other.jgroupsChannelName)) + return false; + if (this.jgroupsFile == null) { + if (other.jgroupsFile != null) + return false; + } + else if (!this.jgroupsFile.equals(other.jgroupsFile)) + return false; + if (this.maxRetryInterval == null) { + if (other.maxRetryInterval != null) + return false; + } + else if (!this.maxRetryInterval.equals(other.maxRetryInterval)) + return false; + if (this.minLargeMessageSize == null) { + if (other.minLargeMessageSize != null) + return false; + } + else if (!this.minLargeMessageSize.equals(other.minLargeMessageSize)) + return false; + if (this.producerMaxRate == null) { + if (other.producerMaxRate != null) + return false; + } + else if (!this.producerMaxRate.equals(other.producerMaxRate)) + return false; + if (this.producerWindowSize == null) { + if (other.producerWindowSize != null) + return false; + } + else if (!this.producerWindowSize.equals(other.producerWindowSize)) + return false; + if (this.reconnectAttempts == null) { + if (other.reconnectAttempts != null) + return false; + } + else if (!this.reconnectAttempts.equals(other.reconnectAttempts)) + return false; + if (this.retryInterval == null) { + if (other.retryInterval != null) + return false; + } + else if (!this.retryInterval.equals(other.retryInterval)) + return false; + if (this.retryIntervalMultiplier == null) { + if (other.retryIntervalMultiplier != null) + return false; + } + else if (!this.retryIntervalMultiplier.equals(other.retryIntervalMultiplier)) + return false; + if (this.scheduledThreadPoolMaxSize == null) { + if (other.scheduledThreadPoolMaxSize != null) + return false; + } + else if (!this.scheduledThreadPoolMaxSize.equals(other.scheduledThreadPoolMaxSize)) + return false; + if (this.threadPoolMaxSize == null) { + if (other.threadPoolMaxSize != null) + return false; + } + else if (!this.threadPoolMaxSize.equals(other.threadPoolMaxSize)) + return false; + if (this.transactionBatchSize == null) { + if (other.transactionBatchSize != null) + return false; + } + else if (!this.transactionBatchSize.equals(other.transactionBatchSize)) + return false; + if (this.useGlobalPools == null) { + if (other.useGlobalPools != null) + return false; + } + else if (!this.useGlobalPools.equals(other.useGlobalPools)) + return false; + if (connectorClassName == null) { + if (other.connectorClassName != null) + return false; + } + else if (!connectorClassName.equals(other.connectorClassName)) + return false; + if (this.connectionParameters == null) { + if (other.connectionParameters != null) + return false; + } + else if (!connectionParameters.equals(other.connectionParameters)) + return false; + return true; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((ha == null) ? 0 : ha.hashCode()); + result = prime * result + ((connectionLoadBalancingPolicyClassName == null) ? 0 : connectionLoadBalancingPolicyClassName.hashCode()); + result = prime * result + ((jgroupsFile == null) ? 0 : jgroupsFile.hashCode()); + result = prime * result + ((jgroupsChannelName == null) ? 0 : jgroupsChannelName.hashCode()); + result = prime * result + ((discoveryAddress == null) ? 0 : discoveryAddress.hashCode()); + result = prime * result + ((discoveryPort == null) ? 0 : discoveryPort.hashCode()); + result = prime * result + ((discoveryLocalBindAddress == null) ? 0 : discoveryLocalBindAddress.hashCode()); + result = prime * result + ((discoveryRefreshTimeout == null) ? 0 : discoveryRefreshTimeout.hashCode()); + result = prime * result + ((discoveryInitialWaitTimeout == null) ? 0 : discoveryInitialWaitTimeout.hashCode()); + result = prime * result + ((clientID == null) ? 0 : clientID.hashCode()); + result = prime * result + ((dupsOKBatchSize == null) ? 0 : dupsOKBatchSize.hashCode()); + result = prime * result + ((transactionBatchSize == null) ? 0 : transactionBatchSize.hashCode()); + result = prime * result + ((clientFailureCheckPeriod == null) ? 0 : clientFailureCheckPeriod.hashCode()); + result = prime * result + ((connectionTTL == null) ? 0 : connectionTTL.hashCode()); + result = prime * result + ((cacheLargeMessagesClient == null) ? 0 : cacheLargeMessagesClient.hashCode()); + result = prime * result + ((callTimeout == null) ? 0 : callTimeout.hashCode()); + result = prime * result + ((callFailoverTimeout == null) ? 0 : callFailoverTimeout.hashCode()); + result = prime * result + ((compressLargeMessage == null) ? 0 : compressLargeMessage.hashCode()); + result = prime * result + ((consumerWindowSize == null) ? 0 : consumerWindowSize.hashCode()); + result = prime * result + ((producerWindowSize == null) ? 0 : producerWindowSize.hashCode()); + result = prime * result + ((consumerMaxRate == null) ? 0 : consumerMaxRate.hashCode()); + result = prime * result + ((confirmationWindowSize == null) ? 0 : confirmationWindowSize.hashCode()); + result = prime * result + ((failoverOnInitialConnection == null) ? 0 : failoverOnInitialConnection.hashCode()); + result = prime * result + ((producerMaxRate == null) ? 0 : producerMaxRate.hashCode()); + result = prime * result + ((minLargeMessageSize == null) ? 0 : minLargeMessageSize.hashCode()); + result = prime * result + ((blockOnAcknowledge == null) ? 0 : blockOnAcknowledge.hashCode()); + result = prime * result + ((blockOnNonDurableSend == null) ? 0 : blockOnNonDurableSend.hashCode()); + result = prime * result + ((blockOnDurableSend == null) ? 0 : blockOnDurableSend.hashCode()); + result = prime * result + ((autoGroup == null) ? 0 : autoGroup.hashCode()); + result = prime * result + ((preAcknowledge == null) ? 0 : preAcknowledge.hashCode()); + result = prime * result + ((initialConnectAttempts == null) ? 0 : initialConnectAttempts.hashCode()); + result = prime * result + ((retryInterval == null) ? 0 : retryInterval.hashCode()); + result = prime * result + ((retryIntervalMultiplier == null) ? 0 : retryIntervalMultiplier.hashCode()); + result = prime * result + ((maxRetryInterval == null) ? 0 : maxRetryInterval.hashCode()); + result = prime * result + ((reconnectAttempts == null) ? 0 : reconnectAttempts.hashCode()); + result = prime * result + ((useGlobalPools == null) ? 0 : useGlobalPools.hashCode()); + result = prime * result + ((initialMessagePacketSize == null) ? 0 : initialMessagePacketSize.hashCode()); + result = prime * result + ((scheduledThreadPoolMaxSize == null) ? 0 : scheduledThreadPoolMaxSize.hashCode()); + result = prime * result + ((threadPoolMaxSize == null) ? 0 : threadPoolMaxSize.hashCode()); + result = prime * result + ((groupID == null) ? 0 : groupID.hashCode()); + result = prime * result + ((connectorClassName == null) ? 0 : connectorClassName.hashCode()); + result = prime * result + ((connectionParameters == null) ? 0 : connectionParameters.hashCode()); + return result; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a1199c4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java index 9f31308..7718830 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java @@ -16,19 +16,31 @@ */ package org.apache.activemq.artemis.tests.integration.ra; +import javax.jms.QueueConnection; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory; +import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactoryImpl; +import org.apache.activemq.artemis.ra.ActiveMQRAConnectionManager; +import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnection; +import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnectionFactory; +import org.apache.activemq.artemis.ra.ActiveMQRASession; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; import org.junit.Test; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase { /* @@ -107,4 +119,55 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase { assertNull(secondaryServer.locateQueue(tempQueue)); } + + @Test + public void testOutboundLoadBalancing() throws Exception { + final int CONNECTION_COUNT = 100; + ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); + List<Session> sessions = new ArrayList<>(); + List<ActiveMQRAManagedConnection> managedConnections = new ArrayList<>(); + + try { + MyBootstrapContext ctx = new MyBootstrapContext(); + qResourceAdapter.start(ctx); + ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager(); + ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory(); + mcf.setResourceAdapter(qResourceAdapter); + ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager); + + QueueConnection queueConnection = qraConnectionFactory.createQueueConnection(); + Session s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + sessions.add(s); + ActiveMQRAManagedConnection mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection(); + managedConnections.add(mc); + ActiveMQConnectionFactory cf1 = mc.getConnectionFactory(); + + long timeout = 10000; + long now = System.currentTimeMillis(); + + while (!((ServerLocatorImpl)cf1.getServerLocator()).isReceivedToplogy()) { + Thread.sleep(50); + } + + for (int i = 0; i < CONNECTION_COUNT; i++) { + queueConnection = qraConnectionFactory.createQueueConnection(); + s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + sessions.add(s); + mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection(); + managedConnections.add(mc); + } + + assertTrue(server.getConnectionCount() >= (CONNECTION_COUNT / 2)); + assertTrue(secondaryServer.getConnectionCount() >= (CONNECTION_COUNT / 2)); + } + finally { + for (Session s : sessions) { + s.close(); + } + + for (ActiveMQRAManagedConnection mc : managedConnections) { + mc.destroy(); + } + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a1199c4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java index 3498ec3..db7f2cd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTest.java @@ -42,9 +42,11 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory; import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactoryImpl; import org.apache.activemq.artemis.ra.ActiveMQRAConnectionManager; +import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnection; import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnectionFactory; import org.apache.activemq.artemis.ra.ActiveMQRASession; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; @@ -311,4 +313,54 @@ public class OutgoingConnectionTest extends ActiveMQRATestBase { assertTrue(xaResourceWrapper.getProductVersion().equals(VersionLoader.getVersion().getFullVersion())); assertTrue(xaResourceWrapper.getProductName().equals(ActiveMQResourceAdapter.PRODUCT_NAME)); } + + @Test + public void testSharedActiveMQConnectionFactory() throws Exception { + Session s = null; + Session s2 = null; + ActiveMQRAManagedConnection mc = null; + ActiveMQRAManagedConnection mc2 = null; + + try { + resourceAdapter = new ActiveMQResourceAdapter(); + + resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName()); + MyBootstrapContext ctx = new MyBootstrapContext(); + resourceAdapter.start(ctx); + ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager(); + ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory(); + mcf.setResourceAdapter(resourceAdapter); + ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager); + + QueueConnection queueConnection = qraConnectionFactory.createQueueConnection(); + s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + mc = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s).getManagedConnection(); + ActiveMQConnectionFactory cf1 = mc.getConnectionFactory(); + + QueueConnection queueConnection2 = qraConnectionFactory.createQueueConnection(); + s2 = queueConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + mc2 = (ActiveMQRAManagedConnection) ((ActiveMQRASession) s2).getManagedConnection(); + ActiveMQConnectionFactory cf2 = mc2.getConnectionFactory(); + + // we're not testing equality so don't use equals(); we're testing if they are actually the *same* object + assertTrue(cf1 == cf2); + } + finally { + if (s != null) { + s.close(); + } + + if (mc != null) { + mc.destroy(); + } + + if (s2 != null) { + s2.close(); + } + + if (mc2 != null) { + mc2.destroy(); + } + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a1199c4/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ConnectionFactoryPropertiesTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ConnectionFactoryPropertiesTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ConnectionFactoryPropertiesTest.java index 5a104ce..74bed99 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ConnectionFactoryPropertiesTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ConnectionFactoryPropertiesTest.java @@ -17,10 +17,15 @@ package org.apache.activemq.artemis.tests.unit.ra; import java.beans.PropertyDescriptor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import org.apache.activemq.artemis.ra.ConnectionFactoryProperties; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; @@ -99,4 +104,85 @@ public class ConnectionFactoryPropertiesTest extends ActiveMQTestBase { } return names; } + + @Test + public void testEquality() throws Exception { + ConnectionFactoryProperties cfp1 = new ConnectionFactoryProperties(); + List<String> connectorClassNames1 = new ArrayList<String>(); + connectorClassNames1.add("myConnector"); + cfp1.setParsedConnectorClassNames(connectorClassNames1); + List<Map<String, Object>> connectionParameters1 = new ArrayList<Map<String, Object>>(); + Map<String, Object> params1 = new HashMap<String, Object>(); + params1.put("port", "0"); + connectionParameters1.add(params1); + cfp1.setParsedConnectionParameters(connectionParameters1); + cfp1.setAutoGroup(true); + + ConnectionFactoryProperties cfp2 = new ConnectionFactoryProperties(); + List<String> connectorClassNames2 = new ArrayList<String>(); + connectorClassNames2.add("myConnector"); + cfp2.setParsedConnectorClassNames(connectorClassNames2); + List<Map<String, Object>> connectionParameters2 = new ArrayList<Map<String, Object>>(); + Map<String, Object> params2 = new HashMap<String, Object>(); + params2.put("port", "0"); + connectionParameters2.add(params2); + cfp2.setParsedConnectionParameters(connectionParameters2); + cfp2.setAutoGroup(true); + + assertTrue(cfp1.equals(cfp2)); + } + + @Test + public void testInequality() throws Exception { + ConnectionFactoryProperties cfp1 = new ConnectionFactoryProperties(); + List<String> connectorClassNames1 = new ArrayList<String>(); + connectorClassNames1.add("myConnector"); + cfp1.setParsedConnectorClassNames(connectorClassNames1); + List<Map<String, Object>> connectionParameters1 = new ArrayList<Map<String, Object>>(); + Map<String, Object> params1 = new HashMap<String, Object>(); + params1.put("port", "0"); + connectionParameters1.add(params1); + cfp1.setParsedConnectionParameters(connectionParameters1); + cfp1.setAutoGroup(true); + + ConnectionFactoryProperties cfp2 = new ConnectionFactoryProperties(); + List<String> connectorClassNames2 = new ArrayList<String>(); + connectorClassNames2.add("myConnector"); + cfp2.setParsedConnectorClassNames(connectorClassNames2); + List<Map<String, Object>> connectionParameters2 = new ArrayList<Map<String, Object>>(); + Map<String, Object> params2 = new HashMap<String, Object>(); + params2.put("port", "1"); + connectionParameters2.add(params2); + cfp2.setParsedConnectionParameters(connectionParameters2); + cfp2.setAutoGroup(true); + + assertFalse(cfp1.equals(cfp2)); + } + + @Test + public void testInequality2() throws Exception { + ConnectionFactoryProperties cfp1 = new ConnectionFactoryProperties(); + List<String> connectorClassNames1 = new ArrayList<String>(); + connectorClassNames1.add("myConnector"); + cfp1.setParsedConnectorClassNames(connectorClassNames1); + List<Map<String, Object>> connectionParameters1 = new ArrayList<Map<String, Object>>(); + Map<String, Object> params1 = new HashMap<String, Object>(); + params1.put("port", "0"); + connectionParameters1.add(params1); + cfp1.setParsedConnectionParameters(connectionParameters1); + cfp1.setAutoGroup(true); + + ConnectionFactoryProperties cfp2 = new ConnectionFactoryProperties(); + List<String> connectorClassNames2 = new ArrayList<String>(); + connectorClassNames2.add("myConnector2"); + cfp2.setParsedConnectorClassNames(connectorClassNames2); + List<Map<String, Object>> connectionParameters2 = new ArrayList<Map<String, Object>>(); + Map<String, Object> params2 = new HashMap<String, Object>(); + params2.put("port", "0"); + connectionParameters2.add(params2); + cfp2.setParsedConnectionParameters(connectionParameters2); + cfp2.setAutoGroup(true); + + assertFalse(cfp1.equals(cfp2)); + } }