This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 9a012c2a66 Revert "ARTEMIS-5627 Disconnect consumers during a Mirror
ACK Retry"
9a012c2a66 is described below
commit 9a012c2a66fd974d1a8493a230d7cd2814f68554
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Aug 26 14:33:44 2025 -0400
Revert "ARTEMIS-5627 Disconnect consumers during a Mirror ACK Retry"
This reverts commit 3bc3d93ed404c325d97d8c8ff8522082ebf63317.
this feature introduced some corner cases (ACKs coming from management
or expiry for example) that could cause more damage to control later if
miss used.
---
.../api/config/ActiveMQDefaultConfiguration.java | 6 -
.../protocol/amqp/broker/AMQPSessionCallback.java | 5 -
.../connect/mirror/AMQPMirrorControllerSource.java | 10 +-
.../connect/mirror/AMQPMirrorControllerTarget.java | 1 -
.../protocol/amqp/connect/mirror/AckManager.java | 39 +----
.../core/protocol/mqtt/MQTTSessionCallback.java | 8 -
.../core/protocol/openwire/amq/AMQSession.java | 7 -
.../artemis/core/protocol/stomp/StompSession.java | 7 -
.../artemis/core/config/Configuration.java | 8 -
.../core/config/impl/ConfigurationImpl.java | 13 --
.../deployers/impl/FileConfigurationParser.java | 3 -
.../impl/ManagementRemotingConnection.java | 5 -
.../protocol/core/impl/CoreSessionCallback.java | 8 -
.../activemq/artemis/core/server/Consumer.java | 7 -
.../apache/activemq/artemis/core/server/Queue.java | 3 -
.../artemis/core/server/impl/QueueImpl.java | 5 -
.../core/server/impl/ServerConsumerImpl.java | 5 -
.../artemis/spi/core/protocol/SessionCallback.java | 2 -
.../resources/schema/artemis-configuration.xsd | 8 -
.../core/config/impl/FileConfigurationTest.java | 1 -
.../resources/ConfigurationTest-full-config.xml | 1 -
.../ConfigurationTest-xinclude-config.xml | 1 -
.../ConfigurationTest-xinclude-schema-config.xml | 1 -
.../unit/core/server/impl/fakes/FakeConsumer.java | 1 +
.../amqp/connect/DisconnectConsumerMirrorTest.java | 163 ---------------------
.../tests/integration/cli/DummyServerConsumer.java | 1 +
.../tests/integration/client/HangConsumerTest.java | 5 -
.../mirror/LargeAccumulationTest.java | 116 ++++++++-------
28 files changed, 75 insertions(+), 365 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 353ad90d9f..c7f7af4ec5 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -720,8 +720,6 @@ public final class ActiveMQDefaultConfiguration {
private static final boolean DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED =
false;
private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
- private static boolean DEFAULT_MIRROR_DISCONNECT_CONSUMERS = false;
-
private static final boolean DEFAULT_PURGE_PAGE_FOLDERS = false;
private static final int DEFAULT_CLUSTER_TOPOLOGY_SCANNER_ATTEMPTS = 30;
@@ -2014,10 +2012,6 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED;
}
- public static boolean getMirrorAckManagerDisconnectConsumers() {
- return DEFAULT_MIRROR_DISCONNECT_CONSUMERS;
- }
-
public static int getMirrorAckManagerRetryDelay() {
return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index d2f87961a0..988ebdd5dc 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -789,11 +789,6 @@ public class AMQPSessionCallback implements
SessionCallback {
});
}
- @Override
- public void failConnection(String errorMessage) {
- serverSession.getRemotingConnection().fail(new
ActiveMQException(errorMessage));
- }
-
@Override
public boolean hasCredits(ServerConsumer consumer) {
ProtonServerSenderContext plugSender = (ProtonServerSenderContext)
consumer.getProtocolContext();
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index d7192d86ef..fae26f0293 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -298,7 +298,7 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
}
String remoteID = getRemoteMirrorId();
if (remoteID == null) {
- // This is to avoid a reflection (Mirror sending messages back to
itself) from a small period of time one node reconnects but not the opposite
direction.
+ // This is to avoid a reflection (Miror sendin messages back to
itself) from a small period of time one node reconnects but not the opposite
direction.
Object localRemoteID = message.getAnnotation(BROKER_ID_SIMPLE_STRING);
if (localRemoteID != null) {
remoteID = String.valueOf(localRemoteID);
@@ -341,22 +341,20 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
SimpleString address = context.getAddress(message);
if (context.isInternal()) {
- logger.trace("sendMessage::server {} is discarding send {} to avoid
sending to internal queue", server, message);
+ logger.trace("sendMessage::server {} is discarding send to avoid
sending to internal queue", server);
return;
}
if (invalidTarget(context.getMirrorSource(), message)) {
- logger.trace("sendMessage::server {} is discarding send {} to avoid
infinite loop (reflection with the mirror)", server, message);
+ logger.trace("sendMessage::server {} is discarding send to avoid
infinite loop (reflection with the mirror)", server);
return;
}
if (ignoreAddress(address)) {
- logger.trace("sendMessage::server {} is discarding send {} to address
{}, address doesn't match filter", server, address, message);
+ logger.trace("sendMessage::server {} is discarding send to address
{}, address doesn't match filter", server, address);
return;
}
- logger.trace("sendMessage::server {} is SENDING {}", server, message);
-
try {
context.setReusable(false);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 485bb298c5..e1e3020c55 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -517,7 +517,6 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
if (internalMirrorID == null) {
internalMirrorID = getRemoteMirrorId(); // not passing the ID means
the data was generated on the remote broker
}
-
Long internalIDLong = (Long)
deliveryAnnotations.getValue().get(INTERNAL_ID);
String internalAddress = (String)
deliveryAnnotations.getValue().get(INTERNAL_DESTINATION);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index 747bc812dc..a7f1b8654f 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -47,12 +47,9 @@ import org.apache.activemq.artemis.core.paging.impl.Page;
import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -258,7 +255,7 @@ public class AckManager implements ActiveMQComponent {
// to be used with the same executor as the PagingStore executor
public void retryAddress(SimpleString address,
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
- checkConsumers(address);
+
// This is an optimization:
// we peek at how many records we currently have. When we scan all the
records that were initially input we would
@@ -382,7 +379,7 @@ public class AckManager implements ActiveMQComponent {
}
}
} else {
- logger.trace("Retry {} attempted {} times on paging, Configuration
Page Attempts={}", retry, retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
+ logger.trace("Retry {} queue attempted {} times on paging,
QueueAttempts {} Configuration Page Attempts={}", retry,
retry.getQueueAttempts(), retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
}
}
}
@@ -515,36 +512,6 @@ public class AckManager implements ActiveMQComponent {
}
}
- private void checkConsumers(SimpleString address) {
- if (configuration.isMirrorDisconnectConsumers()) {
- try {
- Bindings bindings =
server.getPostOffice().getBindingsForAddress(address);
- bindings.forEach((n, b) -> {
- if (b instanceof LocalQueueBinding) {
- Queue queue = ((LocalQueueBinding) b).getQueue();
- checkConsumers(queue);
- }
- });
- } catch (Exception e) {
- // nothing that we can do here other than log
- logger.warn(e.getMessage(), e);
- }
- }
- }
-
- private void checkConsumers(Queue queue) {
- if (configuration.isMirrorDisconnectConsumers() &&
queue.getConsumerCount() > 0) {
- if (logger.isDebugEnabled()) {
- logger.debug("Disconnecting consumers on queue {}",
queue.getName());
- }
- queue.forEachConsumer(this::failConsumer);
- }
- }
-
- private void failConsumer(Consumer consumer) {
- consumer.failConnection("Mirror requesting consumers away to perform
proper ACK retries");
- }
-
public boolean ack(String nodeID, Queue targetQueue, long messageID,
AckReason reason, boolean allowRetry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID={}, messageID={}), targetQueue={},
allowRetry={})", nodeID, messageID, targetQueue.getName(), allowRetry);
@@ -558,8 +525,6 @@ public class AckManager implements ActiveMQComponent {
}
if (allowRetry) {
- checkConsumers(targetQueue);
-
if (configuration != null &&
configuration.isMirrorAckManagerWarnUnacked() && targetQueue.getConsumerCount()
> 0) {
ActiveMQAMQPProtocolLogger.LOGGER.unackWithConsumer(targetQueue.getConsumerCount(),
targetQueue.getName(), nodeID, messageID);
} else {
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index c872e95ab0..9b80d5b66f 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -45,13 +44,6 @@ public class MQTTSessionCallback implements SessionCallback {
return connection.isWritable(callback);
}
-
- @Override
- public void failConnection(String errorMessage) {
- connection.fail(new ActiveMQException(errorMessage));
- }
-
-
@Override
public int sendMessage(MessageReference ref,
ServerConsumer consumer,
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index d44748b73f..4b35b2e656 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.AutoCreateResult;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -284,12 +283,6 @@ public class AMQSession implements SessionCallback {
return connection.isWritable(callback);
}
- @Override
- public void failConnection(String errorMessage) {
- connection.fail(new ActiveMQException(errorMessage));
- }
-
-
@Override
public void sendProducerCreditsMessage(int credits, SimpleString address) {
// TODO Auto-generated method stub
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index bac854868b..dd85ac6bd2 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.EventLoop;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
@@ -119,12 +118,6 @@ public class StompSession implements SessionCallback {
return connection.isWritable(callback);
}
- @Override
- public void failConnection(String errorMessage) {
- connection.fail(new ActiveMQException(errorMessage));
- }
-
-
void setServerSession(ServerSession session) {
this.session = session;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index eb1b2c0a4c..1effd74b11 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1526,14 +1526,6 @@ public interface Configuration {
boolean isMirrorAckManagerWarnUnacked();
- /**
- * Should Mirror disconnect consumers in order to clearn Acknowledgement
retries.
- * This is useful in situations where you want consumers connected to only
one side of the Mirrors.
- * */
- Configuration setMirrorDisconnectConsumers(boolean disconnectConsumers);
-
- boolean isMirrorDisconnectConsumers();
-
/**
* Should the system remove page folders once destinations stop paging.
* Default is false, however future major versions will have this as true
*/
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 72d2a2ed53..6c5738a13b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -466,8 +466,6 @@ public class ConfigurationImpl implements Configuration,
Serializable {
private boolean mirrorAckManagerWarnUnacked =
ActiveMQDefaultConfiguration.getMirrorAckManagerWarnUnacked();
- private boolean mirrorDisconnectConsumers =
ActiveMQDefaultConfiguration.getMirrorAckManagerDisconnectConsumers();
-
private int mirrorAckManagerRetryDelay =
ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay();
private boolean mirrorPageTransaction =
ActiveMQDefaultConfiguration.getMirrorPageTransaction();
@@ -3394,17 +3392,6 @@ public class ConfigurationImpl implements Configuration,
Serializable {
return this;
}
- @Override
- public ConfigurationImpl setMirrorDisconnectConsumers(boolean
disconnectConsumers) {
- this.mirrorDisconnectConsumers = disconnectConsumers;
- return this;
- }
-
- @Override
- public boolean isMirrorDisconnectConsumers() {
- return mirrorDisconnectConsumers;
- }
-
@Override
public ConfigurationImpl setMirrorAckManagerQueueAttempts(int
minQueueAttempts) {
logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}",
minQueueAttempts);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index dd250dc181..8f100cf59a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -395,7 +395,6 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String MIRROR_ACK_MANAGER_RETRY_DELAY =
"mirror-ack-manager-retry-delay";
private static final String MIRROR_ACK_MANAGER_WARN_UNACKED =
"mirror-ack-manager-warn-unacked";
- private static final String MIRROR_DISCONNECT_CONSUMERS =
"mirror-disconnect-consumers";
private static final String MIRROR_PAGE_TRANSACTION =
"mirror-page-transaction";
@@ -890,8 +889,6 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
config.setMirrorAckManagerWarnUnacked(getBoolean(e,
MIRROR_ACK_MANAGER_WARN_UNACKED, config.isMirrorAckManagerWarnUnacked()));
- config.setMirrorDisconnectConsumers(getBoolean(e,
MIRROR_DISCONNECT_CONSUMERS, config.isMirrorDisconnectConsumers()));
-
parseAddressSettings(e, config);
parseResourceLimits(e, config);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
index c35187ccd1..b3487a54d0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -260,11 +260,6 @@ public class ManagementRemotingConnection implements
RemotingConnection {
public void disconnect(ServerConsumer consumerId, String message) {
}
- @Override
- public void failConnection(String errorMessage) {
-
- }
-
@Override
public boolean isWritable(ReadyListener callback, Object
protocolContext) {
return false;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 6cd9deb80d..34eac07137 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.core.Channel;
@@ -81,13 +80,6 @@ public final class CoreSessionCallback implements
SessionCallback {
return connection.isWritable(callback);
}
-
- @Override
- public void failConnection(String errorMessage) {
- connection.fail(new ActiveMQException(errorMessage));
- }
-
-
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer,
MessageReference ref, boolean failed) {
return false;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 0666a494ec..79b97b0834 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -84,13 +84,6 @@ public interface Consumer extends PriorityAware {
*/
void disconnect();
- /**
- * disconnect the consumer
- */
- default void failConnection(String errorMessage) {
- }
-
-
void failed(Throwable t);
/**
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 0da22f18b3..061a32927b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -167,9 +167,6 @@ public interface Queue extends Bindable, CriticalComponent {
void addConsumer(Consumer consumer) throws Exception;
- default void forEachConsumer(java.util.function.Consumer<Consumer>
callback) {
- }
-
void addLingerSession(String sessionId);
void removeLingerSession(String sessionId);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 6f88e57430..aefcba2564 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1228,11 +1228,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
}
- @Override
- public void forEachConsumer(java.util.function.Consumer<Consumer> callback)
{
- consumers.stream().forEach(t -> callback.accept(t.consumer));
- }
-
@Override
public void addLingerSession(String sessionId) {
lingerSessionIds.add(sessionId);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 8a4d56987b..167d9986aa 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1166,11 +1166,6 @@ public class ServerConsumerImpl implements
ServerConsumer, ReadyListener {
callback.disconnect(this, "Queue deleted: " + getQueue().getName());
}
- @Override
- public void failConnection(String errorMessage) {
- callback.failConnection(errorMessage);
- }
-
@Override
public void failed(Throwable t) {
try {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 75b81a8645..ad504739cf 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -79,8 +79,6 @@ public interface SessionCallback {
boolean isWritable(ReadyListener callback, Object protocolContext);
- void failConnection(String errorMessage);
-
/**
* Some protocols (Openwire) needs a special message with the browser is
finished.
*/
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 2e9baeb018..5e87d2e00f 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -998,14 +998,6 @@
</xsd:annotation>
</xsd:element>
- <xsd:element name="mirror-disconnect-consumers" type="xsd:boolean"
maxOccurs="1" minOccurs="0" default="false">
- <xsd:annotation>
- <xsd:documentation>
- Should mirror disconnect consumers if needed to proceed with
acks to minimize chances of a missing ack.
- </xsd:documentation>
- </xsd:annotation>
- </xsd:element>
-
<xsd:element name="suppress-session-notifications" type="xsd:boolean"
default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 9823853988..b59bc1ccbb 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -597,7 +597,6 @@ public class FileConfigurationTest extends
AbstractConfigurationTestBase {
assertEquals(111, configInstance.getMirrorAckManagerQueueAttempts());
assertTrue(configInstance.isMirrorAckManagerWarnUnacked());
- assertTrue(configInstance.isMirrorDisconnectConsumers());
assertEquals(222, configInstance.getMirrorAckManagerPageAttempts());
assertEquals(333, configInstance.getMirrorAckManagerRetryDelay());
assertTrue(configInstance.isMirrorPageTransaction());
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 45e9c42401..b9935e85bb 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -601,7 +601,6 @@
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
- <mirror-disconnect-consumers>true</mirror-disconnect-consumers>
<security-settings>
<security-setting match="a1">
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 80436e0dc2..803eed9747 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -79,7 +79,6 @@
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
- <mirror-disconnect-consumers>true</mirror-disconnect-consumers>
<remoting-incoming-interceptors>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1</class-name>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
index ad1bda4628..eec3aae8e7 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
@@ -79,7 +79,6 @@
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
- <mirror-disconnect-consumers>true</mirror-disconnect-consumers>
<xi:include
href="${xincludePath}/ConfigurationTest-xinclude-schema-config-remoting-incoming-interceptors.xml"/>
diff --git
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 1a6954e4a3..022293328a 100644
---
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -154,6 +154,7 @@ public class FakeConsumer implements Consumer {
@Override
public void disconnect() {
+ //To change body of implemented methods use File | Settings | File
Templates.
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/DisconnectConsumerMirrorTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/DisconnectConsumerMirrorTest.java
deleted file mode 100644
index e968de79f0..0000000000
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/DisconnectConsumerMirrorTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.amqp.connect;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.artemis.api.core.QueueConfiguration;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.core.config.Configuration;
-import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
-import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.tests.util.CFUtil;
-import org.apache.activemq.artemis.tests.util.Wait;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-public class DisconnectConsumerMirrorTest extends ActiveMQTestBase {
-
- private static final int NUMBER_OF_MESSAGES = 5;
-
- ActiveMQServer server1;
- ActiveMQServer server2;
-
- @Test
- public void testDisconnectConsumers() throws Exception {
-
- try {
- String queueName = getName();
-
- {
- Configuration configuration = createDefaultConfig(0, false);
- configuration.setMirrorDisconnectConsumers(true);
- configuration.getAddressConfigurations().clear();
- configuration.setResolveProtocols(true);
-
configuration.setMirrorAckManagerRetryDelay(100).setMirrorAckManagerPageAttempts(5).setMirrorAckManagerQueueAttempts(5);
-
configuration.addQueueConfiguration(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
- configuration.addAcceptorConfiguration("clients",
"tcp://localhost:61616");
- AMQPBrokerConnectConfiguration brokerConnectConfiguration = new
AMQPBrokerConnectConfiguration("toDC2",
"tcp://localhost:61617").setRetryInterval(100).setReconnectAttempts(-1);
- AMQPMirrorBrokerConnectionElement mirror = new
AMQPMirrorBrokerConnectionElement().setDurable(true);
- brokerConnectConfiguration.addMirror(mirror);
- configuration.addAMQPConnection(brokerConnectConfiguration);
- server1 = createServer(true, configuration);
- server1.setIdentity("server1");
- server1.start();
- }
-
- {
- Configuration configuration = createDefaultConfig(1, false);
- configuration.setMirrorDisconnectConsumers(true);
- configuration.setResolveProtocols(true);
-
configuration.setMirrorAckManagerRetryDelay(100).setMirrorAckManagerPageAttempts(5).setMirrorAckManagerQueueAttempts(5);
-
configuration.addQueueConfiguration(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
- configuration.addAcceptorConfiguration("clients",
"tcp://localhost:61617");
- AMQPBrokerConnectConfiguration brokerConnectConfiguration = new
AMQPBrokerConnectConfiguration("toDC1",
"tcp://localhost:61616").setRetryInterval(100).setReconnectAttempts(-1);
- AMQPMirrorBrokerConnectionElement mirror = new
AMQPMirrorBrokerConnectionElement();
- brokerConnectConfiguration.addMirror(mirror);
- configuration.addAMQPConnection(brokerConnectConfiguration);
- server2 = createServer(true, configuration);
- server2.setIdentity("server2");
- server2.start();
- }
-
- validateProtocol("AMQP", queueName);
- validateProtocol("CORE", queueName);
- validateProtocol("OPENWIRE", queueName);
- } finally {
- server1.stop();
- server2.stop();
-
- server1 = null;
- server2 = null;
- }
- }
-
- private void validateProtocol(String protocol, String queueName) throws
Exception {
- Queue mirrorQueue2 =
server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC1");
- assertNotNull(mirrorQueue2);
-
- Queue mirrorQueue1 =
server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC2");
- assertNotNull(mirrorQueue1);
-
- ConnectionFactory factory1 = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
- ConnectionFactory factory2 = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61617");
-
- try (Connection connection = factory2.createConnection()) {
- Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
- MessageProducer producer =
session.createProducer(session.createQueue(queueName));
- for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
- producer.send(session.createTextMessage("hello " + i));
- }
- session.commit();
- }
-
- try (Connection connection1 = factory1.createConnection(); Connection
connection2 = factory2.createConnection()) {
-
- connection1.start();
- connection2.start();
-
- Session session1 = connection1.createSession(true,
Session.SESSION_TRANSACTED);
- MessageConsumer consumer_server1 =
session1.createConsumer(session1.createQueue(queueName));
- assertNotNull(consumer_server1.receive(5000));
-
- Session session2 = connection2.createSession(true,
Session.SESSION_TRANSACTED);
- MessageConsumer consumer_server2 =
session2.createConsumer(session1.createQueue(queueName));
-
- for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
- assertNotNull(consumer_server2.receive(5000));
- }
- session2.commit();
-
- Wait.assertEquals(0, mirrorQueue1::getMessageCount);
- Wait.assertEquals(0, mirrorQueue2::getMessageCount);
-
- verifyNoMessages(server1, server2, queueName);
-
- // Consumers on server1 were supposed to be disconnected
- // as instructed on MirrorDisconnectConsumers
- Assertions.assertThrows(JMSException.class, () -> {
- consumer_server1.receive(5000);
- });
- }
- }
-
- private void verifyNoMessages(ActiveMQServer server1,
- ActiveMQServer server2,
- String queueName) throws Exception {
- Queue queueServer1 = server1.locateQueue(queueName);
- Queue queueServer2 = server2.locateQueue(queueName);
-
- Queue mirrorQueue1 =
server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC2");
- Queue mirrorQueue2 =
server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_toDC1");
-
- Wait.assertEquals(0L, mirrorQueue1::getMessageCount, 5000, 100);
- Wait.assertEquals(0L, mirrorQueue2::getMessageCount, 5000, 100);
-
- Wait.assertEquals(0L, queueServer1::getMessageCount, 5000, 100);
- Wait.assertEquals(0L, queueServer2::getMessageCount, 5000, 100);
- }
-}
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index b27519feb6..6483f07ce9 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -249,6 +249,7 @@ public class DummyServerConsumer implements ServerConsumer {
@Override
public void disconnect() {
+
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 0c539622a3..46901169b7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -519,11 +519,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
return targetCallback.sendLargeMessageContinuation(consumer, body,
continues, requiresResponse);
}
- @Override
- public void failConnection(String errorMessage) {
- targetCallback.failConnection(errorMessage);
- }
-
@Override
public void closed() {
targetCallback.closed();
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
index b46dbee66a..612460ab59 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LargeAccumulationTest.java
@@ -20,7 +20,6 @@ package
org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
-import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -47,7 +46,6 @@ import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.TestParameters;
import org.apache.activemq.artemis.utils.Wait;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -125,7 +123,6 @@ public class LargeAccumulationTest extends SoakTestBase {
brokerProperties.put("AMQPConnections." + connectionName + ".type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName +
".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
- brokerProperties.put("mirrorDisconnectConsumers", "true");
brokerProperties.put("pageSyncTimeout", "" +
TimeUnit.MILLISECONDS.toNanos(1));
brokerProperties.put("messageExpiryScanPeriod", "-1");
@@ -157,6 +154,7 @@ public class LargeAccumulationTest extends SoakTestBase {
+ "logger.db1.level=DEBUG"));
}
+
@BeforeAll
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
@@ -264,6 +262,9 @@ public class LargeAccumulationTest extends SoakTestBase {
@Test
public void testLargeAccumulation() throws Exception {
+ final boolean useTopic = true;
+ final boolean useQueue = true;
+
AtomicInteger errors = new AtomicInteger(0);
// producers will have 2 sets of producers (queue and topic)
@@ -296,57 +297,70 @@ public class LargeAccumulationTest extends SoakTestBase {
}
}
- Connection connectionOnServer2 = cfs[1].createConnection();
- runAfter(connectionOnServer2::close);
-
- Session sessionOnSrv2 = connectionOnServer2.createSession(true,
Session.SESSION_TRANSACTED);
-
- MessageConsumer deadConsumer =
sessionOnSrv2.createConsumer(sessionOnSrv2.createQueue(QUEUE_NAME));
-
CountDownLatch doneTopic = null, doneQueue = null;
- doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeTopic,
"LargeMessageTopic");
- doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeQueue,
"LargeMessageQueue");
+ if (useTopic) {
+ doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeTopic,
"LargeMessageTopic");
+ }
+ if (useQueue) {
+ doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES, 10, SIZE_OF_LARGE_MESSAGE, largeQueue,
"LargeMessageQueue");
+ }
- assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
- assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ if (useTopic) {
+ assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ }
+ if (useQueue) {
+ assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ }
assertEquals(0, errors.get());
- doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeTopic,
"MediumMessageTopic");
- doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeQueue,
"MediumMessageQueue");
+ if (useTopic) {
+ doneTopic = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeTopic,
"MediumMessageTopic");
+ }
+ if (useQueue) {
+ doneQueue = send(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_REGULAR_MESSAGES, 100, SIZE_OF_REGULAR_MESSAGE, largeQueue,
"MediumMessageQueue");
+ }
- assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
- assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ if (useTopic) {
+ assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ }
+ if (useQueue) {
+ assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ }
assertEquals(0, errors.get());
- matchMessageCounts(sm, (long) (NUMBER_OF_LARGE_MESSAGES +
NUMBER_OF_REGULAR_MESSAGES) * NUMBER_OF_THREADS, true);
+ matchMessageCounts(sm, (long) (NUMBER_OF_LARGE_MESSAGES +
NUMBER_OF_REGULAR_MESSAGES) * NUMBER_OF_THREADS, useTopic, useQueue, true);
- doneQueue = new CountDownLatch(NUMBER_OF_THREADS);
- consume(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, null, largeQueue,
doneQueue);
- doneTopic = new CountDownLatch(NUMBER_OF_THREADS *
NUMBER_OF_SUBSCRIPTIONS);
- for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
- consume(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, "sub_" + i,
largeTopic, doneTopic);
+ if (useQueue) {
+ doneQueue = new CountDownLatch(NUMBER_OF_THREADS);
+ consume(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, null, largeQueue,
doneQueue);
+ }
+ if (useTopic) {
+ doneTopic = new CountDownLatch(NUMBER_OF_THREADS *
NUMBER_OF_SUBSCRIPTIONS);
+ for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
+ consume(service, errors, NUMBER_OF_THREADS, cfs[0],
NUMBER_OF_LARGE_MESSAGES + NUMBER_OF_REGULAR_MESSAGES, 100, "sub_" + i,
largeTopic, doneTopic);
+ }
}
- assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ if (useTopic) {
+ assertTrue(doneTopic.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ }
- assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ if (useQueue) {
+ assertTrue(doneQueue.await(LARGE_TIMEOUT_MINUTES, TimeUnit.MINUTES));
+ }
assertEquals(0, errors.get());
- matchMessageCounts(sm, 0, true);
-
- // deadConsumer was supposed to be disconnected from the ack manager
- Assertions.assertThrows(JMSException.class, () -> {
- deadConsumer.receive(5000);
- });
-
- connectionOnServer2.close();
-
+ matchMessageCounts(sm, 0, useTopic, useQueue, true);
}
- private boolean matchMessageCounts(SimpleManagement[] sm, long
numberOfMessages, boolean useWait) throws Exception {
+ private boolean matchMessageCounts(SimpleManagement[] sm,
+ long numberOfMessages,
+ boolean useTopic,
+ boolean useQueue,
+ boolean useWait) throws Exception {
for (SimpleManagement s : sm) {
logger.debug("Checking counts on SNF for {}", s.getUri());
if (useWait) {
@@ -357,23 +371,27 @@ public class LargeAccumulationTest extends SoakTestBase {
}
}
- for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
- String subscriptionName = "sub_" + i + ":global";
- logger.debug("Checking counts on {} on {}", subscriptionName,
s.getUri());
- if (useWait) {
- Wait.assertEquals(numberOfMessages, () ->
s.getMessageCountOnQueue(subscriptionName),
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
- } else {
- if (s.getMessageCountOnQueue(subscriptionName) !=
numberOfMessages) {
- return false;
+ if (useTopic) {
+ for (int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++) {
+ String subscriptionName = "sub_" + i + ":global";
+ logger.debug("Checking counts on {} on {}", subscriptionName,
s.getUri());
+ if (useWait) {
+ Wait.assertEquals(numberOfMessages, () ->
s.getMessageCountOnQueue(subscriptionName),
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
+ } else {
+ if (s.getMessageCountOnQueue(subscriptionName) !=
numberOfMessages) {
+ return false;
+ }
}
}
}
- if (useWait) {
- Wait.assertEquals(numberOfMessages, () ->
s.getMessageCountOnQueue(QUEUE_NAME),
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
- } else {
- if (s.getMessageCountOnQueue(QUEUE_NAME) != numberOfMessages) {
- return false;
+ if (useQueue) {
+ if (useWait) {
+ Wait.assertEquals(numberOfMessages, () ->
s.getMessageCountOnQueue(QUEUE_NAME),
TimeUnit.MINUTES.toMillis(LARGE_TIMEOUT_MINUTES), 100);
+ } else {
+ if (s.getMessageCountOnQueue(QUEUE_NAME) != numberOfMessages) {
+ return false;
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact