This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 3b4e7cb3b7 Revert "Facky tests revealed mainly with faster CI (aka
Github Actions but also with Java 25) (#1610)" (#1617)
3b4e7cb3b7 is described below
commit 3b4e7cb3b73280ebfd0147e413e86bab30d00f10
Author: Matt Pavlovich <[email protected]>
AuthorDate: Fri Jan 16 10:36:58 2026 -0600
Revert "Facky tests revealed mainly with faster CI (aka Github Actions but
also with Java 25) (#1610)" (#1617)
This reverts commit 94c3c3dac2316816c978218443ca9cf88a45a3e8.
Note: Reverting due to accidental inclusion of [AMQ-9829]
ActiveMQMessageConsumer.java
---
.../apache/activemq/ActiveMQMessageConsumer.java | 54 +------
.../PooledConnectionSecurityExceptionTest.java | 65 +-------
.../store/kahadb/scheduler/AMQ7086Test.java | 8 +-
activemq-mqtt/pom.xml | 13 +-
.../transport/mqtt/MQTTProtocolConverterTest.java | 2 +-
.../activemq/ra/ActiveMQConnectionFactoryTest.java | 18 +--
.../activemq/transport/stomp/Stomp11Test.java | 21 +--
.../activemq/transport/stomp/Stomp12Test.java | 28 +---
.../stomp/StompCompositeDestinationTest.java | 16 +-
.../apache/activemq/transport/stomp/StompTest.java | 6 +-
activemq-unit-tests/pom.xml | 15 +-
.../apache/activemq/ZeroPrefetchConsumerTest.java | 21 +--
.../java/org/apache/activemq/bugs/AMQ6815Test.java | 19 +--
.../java/org/apache/activemq/bugs/AMQ7118Test.java | 21 +--
.../jms2/ActiveMQJMS2MessageListenerTest.java | 165 +++++++++------------
.../network/DurableSyncNetworkBridgeTest.java | 27 ----
...callyIncludedDestinationsDuplexNetworkTest.java | 14 +-
.../network/NetworkAdvancedStatisticsTest.java | 59 --------
.../network/VirtualConsumerDemandTest.java | 73 ++-------
.../transport/TransportBrokerTestSupport.java | 1 -
.../nio/AutoNIOJmsDurableTopicSendReceiveTest.java | 2 +-
.../auto/nio/AutoNIOJmsSendAndReceiveTest.java | 2 +-
.../nio/AutoNIOPersistentSendAndReceiveTest.java | 2 +-
.../FailoverDurableSubTransactionTest.java | 8 +-
.../nio/NIOJmsDurableTopicSendReceiveTest.java | 13 +-
.../transport/nio/NIOJmsSendAndReceiveTest.java | 13 +-
.../activemq/transport/nio/NIOSSLLoadTest.java | 2 +-
.../transport/nio/NIOTransportBrokerTest.java | 2 +-
.../activemq/transport/tcp/TransportUriTest.java | 12 +-
.../QueueZeroPrefetchLazyDispatchPriorityTest.java | 34 +----
pom.xml | 8 +-
31 files changed, 157 insertions(+), 587 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index e8ca0b3535..298ce38ddc 100644
---
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -113,7 +113,6 @@ public class ActiveMQMessageConsumer implements
MessageAvailableConsumer, StatsC
class PreviouslyDelivered {
org.apache.activemq.command.Message message;
boolean redelivered;
- boolean prefetchedOnly;
PreviouslyDelivered(MessageDispatch messageDispatch) {
message = messageDispatch.getMessage();
@@ -123,12 +122,6 @@ public class ActiveMQMessageConsumer implements
MessageAvailableConsumer, StatsC
message = messageDispatch.getMessage();
this.redelivered = redelivered;
}
-
- PreviouslyDelivered(MessageDispatch messageDispatch, boolean
redelivered, boolean prefetchedOnly) {
- message = messageDispatch.getMessage();
- this.redelivered = redelivered;
- this.prefetchedOnly = prefetchedOnly;
- }
}
private static final Logger LOG =
LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
@@ -778,9 +771,6 @@ public class ActiveMQMessageConsumer implements
MessageAvailableConsumer, StatsC
// ensure unconsumed are rolledback up front as they may
get redelivered to another consumer
List<MessageDispatch> list =
unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
- if (session.isTransacted()) {
-
capturePrefetchedMessagesForDuplicateSuppression(list);
- }
for (MessageDispatch old : list) {
session.connection.rollbackDuplicate(this,
old.getMessage());
}
@@ -943,16 +933,6 @@ public class ActiveMQMessageConsumer implements
MessageAvailableConsumer, StatsC
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
- if (session.isTransacted()) {
- PreviouslyDelivered entry = null;
- if (previouslyDeliveredMessages != null) {
- entry =
previouslyDeliveredMessages.get(md.getMessage().getMessageId());
- }
- if (entry != null && entry.prefetchedOnly) {
- entry.prefetchedOnly = false;
- entry.redelivered = true;
- }
- }
}
if (session.getTransacted()) {
if (transactedIndividualAck) {
@@ -1440,8 +1420,7 @@ public class ActiveMQMessageConsumer implements
MessageAvailableConsumer, StatsC
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {
// deliverySequenceId non zero means previously queued
dispatch
- if (this.info.isBrowser() || md.getDeliverySequenceId() !=
0l || isPrefetchedRedelivery(md)
- || !session.connection.isDuplicate(this,
md.getMessage())) {
+ if (this.info.isBrowser() || md.getDeliverySequenceId() !=
0l || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null &&
unconsumedMessages.isRunning()) {
if (redeliveryExceeded(md)) {
poisonAck(md, "listener dispatch[" +
md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery
policy limit:" + redeliveryPolicy);
@@ -1591,37 +1570,6 @@ public class ActiveMQMessageConsumer implements
MessageAvailableConsumer, StatsC
LOG.trace("{} tracking existing transacted {} delivered list({})",
getConsumerId(), previouslyDeliveredMessages.transactionId,
deliveredMessages.size());
}
- private void capturePrefetchedMessagesForDuplicateSuppression(final
List<MessageDispatch> list) {
- if (list.isEmpty()) {
- return;
- }
- if (previouslyDeliveredMessages == null) {
- previouslyDeliveredMessages = new
PreviouslyDeliveredMap<MessageId,
PreviouslyDelivered>(session.getTransactionContext().getTransactionId());
- }
- for (MessageDispatch pending : list) {
- if (pending.getMessage() != null) {
-
previouslyDeliveredMessages.put(pending.getMessage().getMessageId(), new
PreviouslyDelivered(pending, false, true));
- }
- }
- LOG.trace("{} tracking existing transacted {} prefetched list({})",
getConsumerId(), previouslyDeliveredMessages.transactionId, list.size());
- }
-
- private boolean isPrefetchedRedelivery(final MessageDispatch md) {
- if (!session.isTransacted()) {
- return false;
- }
- if (md.getMessage() == null) {
- return false;
- }
- synchronized (deliveredMessages) {
- if (previouslyDeliveredMessages != null) {
- PreviouslyDelivered entry =
previouslyDeliveredMessages.get(md.getMessage().getMessageId());
- return entry != null && entry.prefetchedOnly;
- }
- }
- return false;
- }
-
public int getMessageSize() {
return unconsumedMessages.size();
}
diff --git
a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionSecurityExceptionTest.java
b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionSecurityExceptionTest.java
index 9927c365d9..70809f754f 100644
---
a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionSecurityExceptionTest.java
+++
b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionSecurityExceptionTest.java
@@ -69,7 +69,7 @@ public class PooledConnectionSecurityExceptionTest {
@Test
public void testFailedConnectThenSucceeds() throws JMSException {
try (final Connection connection1 =
pooledConnFact.createConnection("invalid", "credentials")) {
- assertSecurityExceptionOnStart(connection1);
+ assertThrows(JMSSecurityException.class, connection1::start);
try (final Connection connection2 =
pooledConnFact.createConnection("system", "manager")) {
connection2.start();
@@ -93,7 +93,7 @@ public class PooledConnectionSecurityExceptionTest {
onExceptionCalled.countDown();
}
});
- assertSecurityExceptionOnStart(connection1);
+ assertThrows(JMSSecurityException.class, connection1::start);
try (final Connection connection2 =
pooledConnFact.createConnection("system", "manager")) {
connection2.start();
@@ -118,7 +118,7 @@ public class PooledConnectionSecurityExceptionTest {
pooledConnFact.setMaxConnections(1);
try (final Connection connection1 =
pooledConnFact.createConnection("invalid", "credentials")) {
- assertSecurityExceptionOnStart(connection1);
+ assertThrows(JMSSecurityException.class, connection1::start);
// The pool should process the async error
// we should eventually get a different connection instance from
the pool regardless of the underlying connection
@@ -145,9 +145,9 @@ public class PooledConnectionSecurityExceptionTest {
pooledConnFact.setMaxConnections(10);
try (final Connection connection1 =
pooledConnFact.createConnection("invalid", "credentials")) {
- assertSecurityExceptionOnStart(connection1);
+ assertThrows(JMSSecurityException.class, connection1::start);
try (final Connection connection2 =
pooledConnFact.createConnection("invalid", "credentials")) {
- assertSecurityExceptionOnStart(connection2);
+ assertThrows(JMSSecurityException.class, connection2::start);
assertNotSame(connection1, connection2);
}
}
@@ -165,7 +165,7 @@ public class PooledConnectionSecurityExceptionTest {
pooledConnFact.setMaxConnections(1);
try (final Connection connection =
pooledConnFact.createConnection("invalid", "credentials")) {
- assertSecurityExceptionOnStart(connection);
+ assertThrows(JMSSecurityException.class, connection::start);
try (final Connection connection2 =
pooledConnFact.createConnection("system", "manager")) {
connection2.start();
@@ -185,7 +185,7 @@ public class PooledConnectionSecurityExceptionTest {
pooledConnFact.setMaxConnections(1);
try (final PooledConnection connection1 = (PooledConnection)
pooledConnFact.createConnection("invalid", "credentials")) {
- assertSecurityExceptionOnStart(connection1);
+ assertThrows(JMSSecurityException.class, connection1::start);
// The pool should process the async error
assertTrue("Should get new connection", Wait.waitFor(new
Wait.Condition() {
@@ -202,7 +202,7 @@ public class PooledConnectionSecurityExceptionTest {
try (final PooledConnection connection2 = (PooledConnection)
pooledConnFact.createConnection("invalid", "credentials")) {
assertNotSame(connection1.pool, connection2.pool);
- assertSecurityExceptionOnStart(connection2);
+ assertThrows(JMSSecurityException.class, connection2::start);
}
}
}
@@ -230,55 +230,6 @@ public class PooledConnectionSecurityExceptionTest {
return name.getMethodName();
}
- /**
- * Helper method to assert that a connection start fails with security
exception.
- * On different test environments, the connection may be disposed
asynchronously
- * before the security exception is fully propagated, resulting in either
JMSSecurityException
- * or generic JMSException with "Disposed" message. Both indicate
authentication failure.
- *
- * This method uses an ExceptionListener to detect when async disposal
completes, providing
- * more reliable detection of security failures across different Java
versions and environments.
- *
- * @param connection the connection to start
- * @throws AssertionError if no exception is thrown or the exception
doesn't indicate auth failure
- */
- private void assertSecurityExceptionOnStart(final Connection connection) {
- try {
- final ExceptionListener listener =
connection.getExceptionListener();
- if (listener == null) { // some tests already leverage the
exception listener
- final CountDownLatch exceptionLatch = new CountDownLatch(1);
-
- // Install listener to capture async exception propagation
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(final JMSException exception) {
- LOG.info("Connection received exception: {}",
exception.getMessage());
- assertTrue(exception instanceof JMSSecurityException);
- exceptionLatch.countDown();
- }
- });
- connection.start(); // should trigger the security exception
reliably and asynchronously
- exceptionLatch.await(1, java.util.concurrent.TimeUnit.SECONDS);
-
- } else {
-
- // Attempt to start and capture the synchronous exception.
- final JMSException thrownException =
assertThrows(JMSException.class, connection::start);
- assertTrue("Should be JMSSecurityException or disposed due to
security exception",
- thrownException instanceof JMSSecurityException ||
- thrownException.getMessage().contains("Disposed"));
- }
-
-
- } catch (final JMSException e) {
- // Ignore
-
- } catch (final InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- }
-
@Before
public void setUp() throws Exception {
LOG.info("========== start " + getName() + " ==========");
diff --git
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
index 1335ca5d79..7ddf28181f 100644
---
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
+++
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
@@ -55,7 +55,7 @@ public class AMQ7086Test {
LOG.info("kahadb store: " + kahaDBPersistenceAdapter);
int numKahadbFiles =
kahaDBPersistenceAdapter.getStore().getJournal().getFileMap().size();
- LOG.info("Num files, job store: {}, message store: {}",
numSchedulerFiles, numKahadbFiles);
+ LOG.info("Num files, job store: {}, message store: {}",
numKahadbFiles, numKahadbFiles);
// pull the dirs before we stop
File jobDir = jobSchedulerStore.getJournal().getDirectory();
@@ -94,10 +94,8 @@ public class AMQ7086Test {
brokerService.stop();
- final int jobFilesOnDisk = verifyFilesOnDisk(jobDir);
- final int kahaFilesOnDisk = verifyFilesOnDisk(kahaDir);
- assertTrue("Expected job store data files at least " +
numSchedulerFiles, jobFilesOnDisk >= numSchedulerFiles);
- assertTrue("Expected kahadb data files at least " + numKahadbFiles,
kahaFilesOnDisk >= numKahadbFiles);
+ assertEquals("Expected job store data files", numSchedulerFiles,
verifyFilesOnDisk(jobDir));
+ assertEquals("Expected kahadb data files", numKahadbFiles,
verifyFilesOnDisk(kahaDir));
}
private int verifyFilesOnDisk(File directory) {
diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml
index ac5398ac4e..55ba9d1ca9 100644
--- a/activemq-mqtt/pom.xml
+++ b/activemq-mqtt/pom.xml
@@ -198,23 +198,12 @@
</plugins>
</pluginManagement>
<plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>properties</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
- <argLine>-javaagent:${org.mockito:mockito-core:jar}</argLine>
+ <argLine>${surefire.argLine}</argLine>
<runOrder>alphabetical</runOrder>
<systemPropertyValues>
<org.apache.activemq.default.directory.prefix>target</org.apache.activemq.default.directory.prefix>
diff --git
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java
index 3a1fd20d8f..c445f924ac 100644
---
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java
+++
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java
@@ -129,7 +129,7 @@ public class MQTTProtocolConverterTest {
executorService.awaitTermination(10, TimeUnit.SECONDS);
ArgumentCaptor<RemoveInfo> removeInfo =
ArgumentCaptor.forClass(RemoveInfo.class);
- Mockito.verify(transport,
times(1)).sendToActiveMQ(removeInfo.capture());
+ Mockito.verify(transport,
times(4)).sendToActiveMQ(removeInfo.capture());
}
}
diff --git
a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
index 57271e5e77..7a4f24991a 100644
---
a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
+++
b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
@@ -136,7 +136,7 @@ public class ActiveMQConnectionFactoryTest {
try {
final TransportConnector transportConnector =
brokerService.getTransportConnectors().get(0);
- String failoverUrl =
String.format("failover:(%s)?maxReconnectAttempts=10&initialReconnectDelay=100",
transportConnector.getConnectUri());
+ String failoverUrl =
String.format("failover:(%s)?maxReconnectAttempts=1",
transportConnector.getConnectUri());
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.start(null);
@@ -165,22 +165,6 @@ public class ActiveMQConnectionFactoryTest {
transportConnector.start();
- // Wait for failover to reconnect and recover() to succeed
- // The ReconnectingXAResource should handle reconnection
transparently
- final XAResource resource = resources[0];
- assertTrue("connection re-established and can recover",
Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- try {
- resource.recover(100);
- return true;
- } catch (Exception e) {
- // Still reconnecting
- return false;
- }
- }
- }, 30000, 500));
-
// should recover ok
assertEquals("no pending transactions", 0,
resources[0].recover(100).length);
diff --git
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
index 789c2679e7..a9e75f3e12 100644
---
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
+++
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
@@ -565,26 +565,13 @@ public class Stomp11Test extends StompTestSupport {
received.getHeaders().get("message-id") + "\n\n" +
Stomp.NULL;
stompConnection.sendFrame(ack);
- // Unsubscribe immediately after invalid ACK to prevent message
redelivery
- // while waiting for ERROR frame. This avoids race condition where
message
- // could be redelivered before ERROR is received.
+ StompFrame error = stompConnection.receive();
+ LOG.info("Received Frame: {}", error);
+ assertTrue("Expected ERROR but got: " + error.getAction(),
error.getAction().equals("ERROR"));
+
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" +
getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub);
-
- // Receive frames until we get the ERROR frame, ignoring any MESSAGE
frames
- // that arrive due to redelivery (especially relevant for SSL
transport)
- StompFrame error = null;
- for (int i = 0; i < 5; i++) {
- error = stompConnection.receive();
- LOG.info("Received Frame: {}", error);
- if (error.getAction().equals("ERROR")) {
- break;
- }
- // If we get a MESSAGE, it's a redelivery - keep trying for ERROR
- }
- assertNotNull("Did not receive any frame", error);
- assertTrue("Expected ERROR but got: " + error.getAction(),
error.getAction().equals("ERROR"));
}
@Test(timeout = 60000)
diff --git
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
index 2e4c85e6be..504a20ff97 100644
---
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
+++
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
@@ -17,7 +17,6 @@
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -159,32 +158,9 @@ public class Stomp12Test extends StompTestSupport {
String frame = "ACK\n" + "message-id:" + ackId + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
- // Unsubscribe immediately to prevent message redelivery while waiting
for ERROR
- String unsubscribe = "UNSUBSCRIBE\n" + "id:1\n\n" + Stomp.NULL;
- stompConnection.sendFrame(unsubscribe);
-
- // Receive frames until we get the ERROR frame, ignoring any MESSAGE
frames
- // that arrive due to redelivery (especially relevant for SSL
transport)
- StompFrame error = null;
- for (int i = 0; i < 5; i++) {
- error = stompConnection.receive();
- LOG.info("Broker sent: " + error);
- if (error.getAction().equals("ERROR")) {
- break;
- }
- // If we get a MESSAGE, it's a redelivery - keep trying for ERROR
- }
- assertNotNull("Did not receive any frame", error);
- assertTrue("Expected ERROR but got: " + error.getAction(),
error.getAction().equals("ERROR"));
-
- // Re-subscribe to receive the message again and test correct ACK
- stompConnection.sendFrame(subscribe);
- receipt = stompConnection.receive();
- assertTrue(receipt.getAction().startsWith("RECEIPT"));
-
received = stompConnection.receive();
- assertTrue(received.getAction().equals("MESSAGE"));
- ackId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
+ assertTrue(received.getAction().equals("ERROR"));
+ LOG.info("Broker sent: " + received);
// Now place it in the correct location and check it still works.
frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2\n\n" + Stomp.NULL;
diff --git
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java
index ad22363819..d3fced4f9a 100644
---
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java
+++
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java
@@ -225,22 +225,14 @@ public class StompCompositeDestinationTest extends
StompTestSupport {
}
}, TimeUnit.SECONDS.toMillis(30),
TimeUnit.MILLISECONDS.toMillis(150)));
- final QueueViewMBean viewOfA = getProxyToQueue(destinationA);
- final QueueViewMBean viewOfB = getProxyToQueue(destinationB);
+ QueueViewMBean viewOfA = getProxyToQueue(destinationA);
+ QueueViewMBean viewOfB = getProxyToQueue(destinationB);
assertNotNull(viewOfA);
assertNotNull(viewOfB);
- assertTrue("Queues should each have 1 message", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- try {
- return viewOfA.getQueueSize() == 1 &&
viewOfB.getQueueSize() == 1;
- } catch (Exception ignored) {
- return false;
- }
- }
- }, TimeUnit.SECONDS.toMillis(30),
TimeUnit.MILLISECONDS.toMillis(150)));
+ assertEquals(1, viewOfA.getQueueSize());
+ assertEquals(1, viewOfB.getQueueSize());
stompConnection.disconnect();
}
diff --git
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index eba3c6747f..50349ec6ca 100644
---
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -2271,11 +2271,7 @@ public class StompTest extends StompTestSupport {
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
- // Handle both CONNECTED (successful re-connect) and ERROR (already
connected)
- // Different STOMP transports may behave differently
- if (!frame.startsWith("CONNECTED") && !frame.startsWith("ERROR")) {
- fail("Expected CONNECTED or ERROR but got: " + frame);
- }
+ assertTrue(frame.startsWith("CONNECTED"));
boolean gotMessage = false;
boolean gotReceipt = false;
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 4511726bed..38f589c66f 100644
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -279,7 +279,7 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
+ <artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
@@ -394,17 +394,6 @@
</instructions>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>properties</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
@@ -418,7 +407,6 @@
<runOrder>alphabetical</runOrder>
<reportFormat>plain</reportFormat>
<failIfNoTests>false</failIfNoTests>
- <argLine>-javaagent:${org.mockito:mockito-core:jar}</argLine>
<excludedGroups>org.apache.activemq.test.annotations.ParallelTest</excludedGroups>
<systemPropertyVariables>
<org.apache.activemq.default.directory.prefix>${project.build.directory}/</org.apache.activemq.default.directory.prefix>
@@ -484,7 +472,6 @@
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds> <!-- max
time tests may run -->
<forkedProcessExitTimeoutInSeconds>30</forkedProcessExitTimeoutInSeconds> <!--
max time JVM may hang after tests -->
<failIfNoTests>false</failIfNoTests>
- <argLine>-javaagent:${org.mockito:mockito-core:jar}</argLine>
<systemPropertyVariables>
<org.apache.activemq.default.directory.prefix>${project.build.directory}/parallel-tests-${surefire.forkNumber}/</org.apache.activemq.default.directory.prefix>
<!-- when running tests in parallel in the CI (quite slow) we
need to bump the wireformat negotiation timeout (5s by default) -->
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index abc25438f3..b7297e8650 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -348,37 +348,22 @@ public class ZeroPrefetchConsumerTest extends
EmbeddedBrokerTestSupport {
public void testBrokerZeroPrefetchConfigWithConsumerControl() throws
Exception {
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
session.createConsumer(brokerZeroQueue);
-
- // Wait for broker subscription to be created and policy applied
- final ActiveMQDestination transformedDest =
ActiveMQDestination.transform(brokerZeroQueue);
- org.apache.activemq.util.Wait.waitFor(new
org.apache.activemq.util.Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
- &&
!broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
- }
- }, 5000, 100);
-
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
session.createConsumer(brokerZeroQueue);
assertEquals("broker config prefetch in effect", 0,
consumer.info.getCurrentPrefetchSize());
// verify sub view broker
Subscription sub =
-
broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().get(0);
+
broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0);
assertEquals("broker sub prefetch is correct", 0,
sub.getConsumerInfo().getCurrentPrefetchSize());
// manipulate Prefetch (like failover and stomp)
ConsumerControl consumerControl = new ConsumerControl();
consumerControl.setConsumerId(consumer.info.getConsumerId());
- consumerControl.setDestination(transformedDest);
+
consumerControl.setDestination(ActiveMQDestination.transform(brokerZeroQueue));
consumerControl.setPrefetch(1000); // default for a q
Object reply = ((ActiveMQConnection)
connection).getTransport().request(consumerControl);
assertTrue("good request", !(reply instanceof ExceptionResponse));
-
- // Wait for the ConsumerControl to be processed
- Thread.sleep(500);
-
assertEquals("broker config prefetch in effect", 0,
consumer.info.getCurrentPrefetchSize());
assertEquals("broker sub prefetch is correct", 0,
sub.getConsumerInfo().getCurrentPrefetchSize());
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
index e2a75633a4..80f8cadf7d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
@@ -29,15 +29,12 @@ import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.TimeUnit;
-
import static org.junit.Assert.assertTrue;
public class AMQ6815Test {
@@ -86,18 +83,12 @@ public class AMQ6815Test {
sendMessages(5000); // 5k of 1k messages = 5MB and limit is 1MB so
some will be paged to disk
- final long maxUsedMemory = 5L * MEM_LIMIT;
- final long[] usedMem = new long[1];
- boolean withinLimit = Wait.waitFor(() -> {
- Runtime.getRuntime().gc();
- usedMem[0] = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory() - initUsedMemory;
- return usedMem[0] < maxUsedMemory;
- }, TimeUnit.SECONDS.toMillis(30), TimeUnit.SECONDS.toMillis(1));
-
- LOG.info("Mem in use: " + usedMem[0] / 1024 + "K");
+ Runtime.getRuntime().gc();
+ long usedMem = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory() - initUsedMemory;
+ LOG.info("Mem in use: " + usedMem/1024 + "K");
- // 5 is a generous factor because we don't create this many
additional objects per message
- assertTrue("Used Mem reasonable " + usedMem[0], withinLimit);
+ // 2 is a big generous factor because we don't create this many
additional objects per message
+ assertTrue("Used Mem reasonable " + usedMem, usedMem < 5 * MEM_LIMIT);
}
protected void sendMessages(int count) throws JMSException {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
index 8d54527f68..84f29ebb97 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
@@ -133,14 +133,12 @@ public class AMQ7118Test {
}
LOG.info("All messages Consumed.");
- // Clean up the log files and verify compaction reduces file count
- // Note: Don't check exact file names as they vary due to async
preallocation and timing
- // The nextDataFileId (in
org.apache.activemq.store.kahadb.disk.journal.Journal.newDataFile) counter
never decreases,
- // so file numbers depend on how many files were created during the
run, not just how many currently exist
- checkFiles(true, 5, null); // After consumption, should compact to <=
5 files
- checkFiles(true, 5, null); // Verify it stabilizes
- checkFiles(true, 5, null); // And stays stable
- checkFiles(true, 5, null); // One more check
+ //Clean up the log files and be sure its stable
+ checkFiles(true, 2, "db-30.log");
+ checkFiles(true, 3, "db-31.log");
+ checkFiles(true, 2, "db-31.log");
+ checkFiles(true, 2, "db-31.log");
+ checkFiles(true, 2, "db-31.log");
broker.stop();
broker.waitUntilStopped();
@@ -195,8 +193,7 @@ public class AMQ7118Test {
boolean settled = Wait.waitFor(() -> {
File[] current = dbfiles.listFiles(lff);
Arrays.sort(current, new DBFileComparator());
- // If lastFileName is null, only check file count (for cases where
exact file name is non-deterministic)
- return current.length <= expectedCount + 1 && (lastFileName ==
null || current[current.length - 1].getName().equals(lastFileName));
+ return current.length <= expectedCount + 1 &&
current[current.length - 1].getName().equals(lastFileName);
}, 30_000, 1_000);
File files[] = dbfiles.listFiles(lff);
@@ -206,9 +203,7 @@ public class AMQ7118Test {
assertTrue("KahaDB log compaction did not settle in time", settled);
assertTrue("Unexpected number of log files: " + files.length,
files.length <= expectedCount + 1);
- if (lastFileName != null) {
- assertEquals(lastFileName, files[files.length-1].getName());
- }
+ assertEquals(lastFileName, files[files.length-1].getName());
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
index c7263cb46c..2ca9209820 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2MessageListenerTest.java
@@ -77,108 +77,83 @@ public class ActiveMQJMS2MessageListenerTest extends
ActiveMQJMS2TestBase {
@Test
public void testMessageListener() {
- try(JMSContext producerContext =
activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS,
ackMode)) {
- assertNotNull(producerContext);
- Destination destination =
ActiveMQJMS2TestSupport.generateDestination(producerContext, destinationType,
destinationName);
+ try(JMSContext jmsContext =
activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS,
ackMode)) {
+ assertNotNull(jmsContext);
+ Destination destination =
ActiveMQJMS2TestSupport.generateDestination(jmsContext, destinationType,
destinationName);
assertNotNull(destination);
-
- // Use a separate context for consuming to avoid issues with
AUTO_ACKNOWLEDGE mode
- // when using the same context for both producing and consuming
- try(JMSContext consumerContext =
activemqConnectionFactory.createContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS,
ackMode)) {
- JMSConsumer jmsConsumer =
consumerContext.createConsumer(destination);
-
- AtomicInteger receivedMessageCount = new AtomicInteger();
- AtomicInteger exceptionCount = new AtomicInteger();
- CountDownLatch countDownLatch = new CountDownLatch(2);
-
- jmsConsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- receivedMessageCount.incrementAndGet();
- countDownLatch.countDown();
- try {
- switch(ackMode) {
- case Session.CLIENT_ACKNOWLEDGE:
message.acknowledge(); break;
- case ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE:
message.acknowledge(); break;
- default: break;
- }
- } catch (JMSException e) {
- exceptionCount.incrementAndGet();
- }
- }
- });
-
- // Start consuming before sending messages (original test
behavior)
- consumerContext.start();
-
- Message message =
ActiveMQJMS2TestSupport.generateMessage(producerContext, "text",
messagePayload);
-
- List<String> sentMessageIds = new LinkedList<>();
- for(int deliveryMode :
Arrays.asList(DeliveryMode.NON_PERSISTENT, DeliveryMode.PERSISTENT)) {
- MessageData sendMessageData = new MessageData();
-
sendMessageData.setMessage(message).setDeliveryMode(deliveryMode);
-
sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(producerContext,
destination, sendMessageData));
- }
-
- // Wait for the queue to be created and MBean to be registered
before accessing it
- final ActiveMQDestination activeMQDestination =
(ActiveMQDestination)destination;
- assertTrue("Queue MBean not registered in time",
Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- try {
- QueueViewMBean testMBean =
getQueueViewMBean(activeMQDestination);
- return testMBean != null &&
testMBean.getEnqueueCount() >= 0;
- } catch (Exception e) {
- return false;
+ QueueViewMBean localQueueViewMBean =
getQueueViewMBean((ActiveMQDestination)destination);
+ JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
+
+ AtomicInteger receivedMessageCount = new AtomicInteger();
+ AtomicInteger exceptionCount = new AtomicInteger();
+ CountDownLatch countDownLatch = new CountDownLatch(2);
+
+ jmsConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ receivedMessageCount.incrementAndGet();
+ countDownLatch.countDown();
+ try {
+ switch(ackMode) {
+ case Session.CLIENT_ACKNOWLEDGE:
message.acknowledge(); break;
+ case ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE:
message.acknowledge(); break;
+ default: break;
}
+ } catch (JMSException e) {
+ exceptionCount.incrementAndGet();
}
- }, 5000L, 100L));
-
- QueueViewMBean localQueueViewMBean =
getQueueViewMBean(activeMQDestination);
-
- // For session transacted ack we ack after all messages are
sent
- switch(ackMode) {
- case ActiveMQSession.SESSION_TRANSACTED:
- assertEquals(Long.valueOf(0),
Long.valueOf(localQueueViewMBean.getEnqueueCount()));
- producerContext.commit();
- assertEquals(Long.valueOf(2),
Long.valueOf(localQueueViewMBean.getEnqueueCount()));
- break;
- default:
- assertEquals(Long.valueOf(2),
Long.valueOf(localQueueViewMBean.getEnqueueCount()));
- break;
}
-
- assertTrue("Did not receive all messages in time",
countDownLatch.await(10, TimeUnit.SECONDS));
-
- assertEquals(Integer.valueOf(2),
Integer.valueOf(receivedMessageCount.get()));
- assertEquals(Integer.valueOf(0),
Integer.valueOf(exceptionCount.get()));
-
- // For session transacted we ack after all messages are
received
- switch(ackMode) {
- case ActiveMQSession.SESSION_TRANSACTED:
- assertEquals(Long.valueOf(0),
Long.valueOf(localQueueViewMBean.getDequeueCount()));
- consumerContext.commit();
- break;
- default: break;
+ });
+ jmsContext.start();
+
+ Message message =
ActiveMQJMS2TestSupport.generateMessage(jmsContext, "text", messagePayload);
+
+ List<String> sentMessageIds = new LinkedList<>();
+ for(int deliveryMode : Arrays.asList(DeliveryMode.NON_PERSISTENT,
DeliveryMode.PERSISTENT)) {
+ MessageData sendMessageData = new MessageData();
+
sendMessageData.setMessage(message).setDeliveryMode(deliveryMode);
+
sentMessageIds.add(ActiveMQJMS2TestSupport.sendMessage(jmsContext, destination,
sendMessageData));
+ }
+
+ // For session transacted ack we ack after all messages are sent
+ switch(ackMode) {
+ case ActiveMQSession.SESSION_TRANSACTED:
+ assertEquals(Long.valueOf(0),
Long.valueOf(localQueueViewMBean.getEnqueueCount()));
+ jmsContext.commit();
+ assertEquals(Long.valueOf(2),
Long.valueOf(localQueueViewMBean.getEnqueueCount()));
+ break;
+ default:
+ assertEquals(Long.valueOf(2),
Long.valueOf(localQueueViewMBean.getEnqueueCount()));
+ break;
+ }
+
+ assertTrue("Did not receive all messages in time",
countDownLatch.await(10, TimeUnit.SECONDS));
+
+ assertEquals(Integer.valueOf(2),
Integer.valueOf(receivedMessageCount.get()));
+ assertEquals(Integer.valueOf(0),
Integer.valueOf(exceptionCount.get()));
+
+ // For session transacted we ack after all messages are received
+ switch(ackMode) {
+ case ActiveMQSession.SESSION_TRANSACTED:
+ assertEquals(Long.valueOf(0),
Long.valueOf(localQueueViewMBean.getDequeueCount()));
+ jmsContext.commit();
+ break;
+ default: break;
+ }
+ jmsConsumer.close();
+
+ final Logger logger = LoggerFactory.getLogger(this.getClass());
+ assertTrue("Queue should drain in time", Wait.waitFor(new
Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ logger.info("Current Queue size: " +
localQueueViewMBean.getQueueSize() +
+ ", dequeue count: " +
localQueueViewMBean.getDequeueCount());
+ return localQueueViewMBean.getQueueSize() == 0L &&
localQueueViewMBean.getDequeueCount() >= 2L;
}
- jmsConsumer.close();
-
- final Logger logger = LoggerFactory.getLogger(this.getClass());
- assertTrue("Queue should drain in time", Wait.waitFor(new
Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- logger.info("Current Queue size: " +
localQueueViewMBean.getQueueSize() +
- ", dequeue count: " +
localQueueViewMBean.getDequeueCount());
- return localQueueViewMBean.getQueueSize() == 0L &&
localQueueViewMBean.getDequeueCount() >= 2L;
- }
- }, 60000L, 200L));
- } // Close consumer context
+ }, 60000L, 200L));
} catch (Exception e) {
- e.printStackTrace();
- Throwable cause = e.getCause();
- String causeMsg = cause != null ? " Cause: " +
cause.getClass().getName() + ": " + cause.getMessage() : "";
- fail("Test failed: " + e.getClass().getName() + ": " +
e.getMessage() + causeMsg);
+ fail(e.getMessage());
}
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 9727dbc41a..a1329ae9c7 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -138,10 +138,6 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
- // Wait for subscription to become inactive before attempting removal
- // It's very important to wait here, otherwise the removal may not
propagate
- waitForSubscriptionInactive(broker1, topic, subName);
-
removeSubscription(broker1, subName);
assertSubscriptionsCount(broker1, topic, 0);
@@ -873,27 +869,4 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
return brokerService;
}
- /**
- * Wait for a durable subscription to become inactive before attempting
removal.
- * This prevents "Durable consumer is in use" errors when consumer close
operations
- * complete asynchronously (especially visible with Java 25's different
thread scheduling).
- */
- protected void waitForSubscriptionInactive(final BrokerService
brokerService,
- final ActiveMQTopic topic,
- final String subName) throws
Exception {
- assertTrue("Subscription should become inactive", Wait.waitFor(new
Condition() {
- @Override
- public boolean isSatisified() throws Exception {
-
List<org.apache.activemq.broker.region.DurableTopicSubscription> subs =
getSubscriptions(brokerService, topic);
- for
(org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) {
- if
(sub.getSubscriptionKey().getSubscriptionName().equals(subName)) {
- return !sub.isActive();
- }
- }
- // If subscription doesn't exist, it's considered inactive
- return true;
- }
- }, 5000, 100));
- }
-
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
index 39018a18fc..5888c53c5a 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
@@ -110,17 +110,11 @@ public class
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
return bridge;
}
- public TransportConnection getDuplexBridgeConnectionFromRemote() throws
Exception {
- final TransportConnector transportConnector =
remoteBroker.getTransportConnectorByScheme("tcp");
- assertTrue("Timed out waiting for duplex bridge connection",
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() {
- return !transportConnector.getConnections().isEmpty();
- }
- }));
+ public TransportConnection getDuplexBridgeConnectionFromRemote() {
+ TransportConnector transportConnector =
remoteBroker.getTransportConnectorByScheme("tcp");
CopyOnWriteArrayList<TransportConnection> transportConnections =
transportConnector.getConnections();
- return transportConnections.get(0);
+ TransportConnection duplexBridgeConnectionFromRemote =
transportConnections.get(0);
+ return duplexBridgeConnectionFromRemote;
}
@Override
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
index 50f61d35de..c3cde8008e 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
@@ -137,15 +137,6 @@ public class NetworkAdvancedStatisticsTest extends
BaseNetworkTest {
localConnection.start();
remoteConnection.start();
- // Wait for network bridge to be established before sending messages
- waitForBridgeFormation();
-
- // For topics, wait for consumer demand to be propagated to local
broker
- // This is critical for non-durable topics to avoid message loss
- if (includedDestination.isTopic()) {
- waitForConsumerDemandPropagation();
- }
-
MessageProducer producer =
localSession.createProducer(includedDestination);
String lastIncludedSentMessageID = null;
for (int i = 0; i < MESSAGE_COUNT; i++) {
@@ -258,56 +249,6 @@ public class NetworkAdvancedStatisticsTest extends
BaseNetworkTest {
remoteConsumer.close();
}
- /**
- * Waits for the network bridge to be fully established between brokers.
- * This prevents race conditions where messages are sent before the bridge
is ready.
- */
- protected void waitForBridgeFormation() throws Exception {
- // Wait for local broker's network connector to have active bridges
- assertTrue("Local broker bridge not formed in time", Wait.waitFor(new
Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return !localBroker.getNetworkConnectors().isEmpty()
- &&
!localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty();
- }
- }, 10000, 100));
-
- // Wait for remote broker's network connector to have active bridges
- assertTrue("Remote broker bridge not formed in time", Wait.waitFor(new
Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return !remoteBroker.getNetworkConnectors().isEmpty()
- &&
!remoteBroker.getNetworkConnectors().get(0).activeBridges().isEmpty();
- }
- }, 10000, 100));
- }
-
- /**
- * Waits for consumer demand to be propagated from remote broker to local
broker.
- * This is critical for topics (especially non-durable) to ensure messages
aren't lost.
- */
- protected void waitForConsumerDemandPropagation() throws Exception {
- assertTrue("Consumer demand not propagated in time", Wait.waitFor(new
Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- try {
- // Check if local broker has network consumers for the
included destination
- // This indicates demand has been propagated from remote
broker
- return
localBroker.getDestination(includedDestination).getConsumers().size() > 0;
- } catch (Exception e) {
- return false;
- }
- }
- }, 10000, 100));
-
- // For durable subscriptions, wait additional time to ensure the
durable subscriber
- // is fully registered and recognized by the network bridge. This
prevents the first
- // message from being sent before the subscription is completely
established.
- if (durable && includedDestination.isTopic()) {
- Thread.sleep(2000);
- }
- }
-
protected void assertNetworkBridgeStatistics(final long expectedLocalSent,
final long expectedRemoteSent) throws Exception {
final NetworkBridge localBridge =
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index 23b5375413..409069b2c1 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -554,6 +554,7 @@ public class VirtualConsumerDemandTest extends
DynamicNetworkTestSupport {
MessageProducer includedProducer =
localSession.createProducer(included);
Message test = localSession.createTextMessage("test");
+ Thread.sleep(1000);
final DestinationStatistics destinationStatistics =
localBroker.getDestination(included).getDestinationStatistics();
final DestinationStatistics remoteDestStatistics =
remoteBroker.getDestination(
@@ -666,6 +667,7 @@ public class VirtualConsumerDemandTest extends
DynamicNetworkTestSupport {
MessageProducer includedProducer =
localSession.createProducer(included);
Message test = localSession.createTextMessage("test");
+ Thread.sleep(1000);
final DestinationStatistics destinationStatistics =
localBroker.getDestination(included).getDestinationStatistics();
final DestinationStatistics remoteDestStatistics =
remoteBroker.getDestination(
@@ -684,26 +686,10 @@ public class VirtualConsumerDemandTest extends
DynamicNetworkTestSupport {
assertEquals("remote2 dest messages", 1,
remoteDestStatistics2.getMessages().getCount());
runtimeBroker.setVirtualDestinations(new VirtualDestination[] {},
true);
- assertTrue("virtual destinations not cleared",
- Wait.waitFor(() -> {
- try {
- assertAdvisoryBrokerCounts(0, 0, 0);
- return true;
- } catch (AssertionError | Exception e) {
- return false;
- }
- }, 10000, 200));
- waitForConsumerCount(destinationStatistics, 0);
+ Thread.sleep(2000);
includedProducer.send(test);
- long dispatched = destinationStatistics.getDispatched().getCount();
- long dequeues = destinationStatistics.getDequeues().getCount();
- long forwards = destinationStatistics.getForwards().getCount();
- assertTrue("unexpected dispatch detected",
- !Wait.waitFor(() ->
destinationStatistics.getDispatched().getCount() > dispatched
- || destinationStatistics.getDequeues().getCount() >
dequeues
- || destinationStatistics.getForwards().getCount() >
forwards,
- 2000, 100));
+ Thread.sleep(2000);
assertLocalBrokerStatistics(destinationStatistics, 1);
assertEquals("remote dest messages", 1,
remoteDestStatistics.getMessages().getCount());
@@ -1035,43 +1021,26 @@ public class VirtualConsumerDemandTest extends
DynamicNetworkTestSupport {
MessageConsumer advisoryConsumer =
getVirtualDestinationAdvisoryConsumer(testTopicName);
- assertBridgeStarted();
+ //sleep to allow the route to be set up
+ Thread.sleep(2000);
remoteBroker.getBroker().addDestination(remoteBroker.getAdminConnectionContext(),
new ActiveMQQueue("include.test.bar.bridge"), false);
- assertTrue("remote destination not ready",
- Wait.waitFor(() -> remoteBroker.getDestination(new
ActiveMQQueue("include.test.bar.bridge")) != null,
- 10000, 200));
+ Thread.sleep(2000);
//remove the virtual destinations after startup
runtimeBroker.setVirtualDestinations(new VirtualDestination[] {},
true);
- assertTrue("virtual destinations not cleared",
- Wait.waitFor(() -> {
- try {
- assertAdvisoryBrokerCounts(0, 0, 0);
- return true;
- } catch (AssertionError | Exception e) {
- return false;
- }
- }, 10000, 200));
MessageProducer includedProducer =
localSession.createProducer(included);
+ Thread.sleep(2000);
Message test = localSession.createTextMessage("test");
//assert that no message was received
//by the time we get here, there is no more virtual destinations so
this won't
//trigger demand
MessageConsumer bridgeConsumer = remoteSession.createConsumer(new
ActiveMQQueue("include.test.bar.bridge"));
- assertTrue("virtual destinations not cleared",
- Wait.waitFor(() -> {
- try {
- assertAdvisoryBrokerCounts(0, 0, 0);
- return true;
- } catch (AssertionError | Exception e) {
- return false;
- }
- }, 10000, 200));
+ Thread.sleep(2000);
includedProducer.send(test);
assertNull(bridgeConsumer.receive(5000));
@@ -1090,15 +1059,12 @@ public class VirtualConsumerDemandTest extends
DynamicNetworkTestSupport {
MessageConsumer advisoryConsumer =
getVirtualDestinationAdvisoryConsumer(testTopicName);
- assertTrue("brokers not started",
- Wait.waitFor(() -> remoteBroker.isStarted() &&
localBroker.isStarted(), 10000, 200));
+ Thread.sleep(2000);
remoteBroker.getBroker().addDestination(remoteBroker.getAdminConnectionContext(),
new ActiveMQQueue("include.test.bar.bridge"), false);
- assertTrue("remote destination not ready",
- Wait.waitFor(() -> remoteBroker.getDestination(new
ActiveMQQueue("include.test.bar.bridge")) != null,
- 10000, 200));
+ Thread.sleep(2000);
//start the local broker after establishing the virtual topic to test
replay
localBroker.addNetworkConnector(connector);
@@ -1121,32 +1087,23 @@ public class VirtualConsumerDemandTest extends
DynamicNetworkTestSupport {
MessageConsumer advisoryConsumer =
getVirtualDestinationAdvisoryConsumer(testTopicName);
- assertTrue("brokers not started",
- Wait.waitFor(() -> remoteBroker.isStarted() &&
localBroker.isStarted(), 10000, 200));
+ Thread.sleep(2000);
remoteBroker.getBroker().addDestination(remoteBroker.getAdminConnectionContext(),
new ActiveMQQueue("include.test.bar.bridge"), false);
- assertTrue("remote destination not ready",
- Wait.waitFor(() -> remoteBroker.getDestination(new
ActiveMQQueue("include.test.bar.bridge")) != null,
- 10000, 200));
+ Thread.sleep(2000);
MessageProducer includedProducer =
localSession.createProducer(included);
Message test = localSession.createTextMessage("test");
MessageConsumer bridgeConsumer = remoteSession.createConsumer(new
ActiveMQQueue("include.test.bar.bridge"));
- assertTrue("remote consumer not registered",
- Wait.waitFor(() -> remoteBroker.getDestination(new
ActiveMQQueue("include.test.bar.bridge"))
- .getConsumers().size() == 1, 10000, 200));
+ Thread.sleep(2000);
//start the local broker after establishing the virtual topic to test
replay
localBroker.addNetworkConnector(connector);
connector.start();
- assertTrue("network bridge not ready",
- Wait.waitFor(() ->
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
- 10000, 200));
- DestinationStatistics localDestinationStats =
localBroker.getDestination(included).getDestinationStatistics();
- waitForConsumerCount(localDestinationStats, 1);
+ Thread.sleep(2000);
includedProducer.send(test);
assertNotNull(bridgeConsumer.receive(5000));
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/TransportBrokerTestSupport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/TransportBrokerTestSupport.java
index 8c19ea88b8..b40f574188 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/TransportBrokerTestSupport.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/TransportBrokerTestSupport.java
@@ -40,7 +40,6 @@ public abstract class TransportBrokerTestSupport extends
BrokerTest {
protected BrokerService createBroker() throws Exception {
BrokerService service = super.createBroker();
connector = service.addConnector(getBindLocation());
- service.setBrokerName("localhost-" + System.nanoTime()); // avoid
potential timing issues between stop / start with JMX
return service;
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOJmsDurableTopicSendReceiveTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOJmsDurableTopicSendReceiveTest.java
index bfa0b6bf41..869b10eaba 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOJmsDurableTopicSendReceiveTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOJmsDurableTopicSendReceiveTest.java
@@ -22,7 +22,7 @@ public class AutoNIOJmsDurableTopicSendReceiveTest extends
NIOJmsDurableTopicSen
@Override
protected String getBrokerURL() {
- return "auto+nio://localhost:0";
+ return "auto+nio://localhost:61616";
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOJmsSendAndReceiveTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOJmsSendAndReceiveTest.java
index c3020a4058..7f0b72db65 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOJmsSendAndReceiveTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOJmsSendAndReceiveTest.java
@@ -25,7 +25,7 @@ public class AutoNIOJmsSendAndReceiveTest extends
NIOJmsSendAndReceiveTest {
@Override
protected String getBrokerURL() {
- return "auto+nio://localhost:0";
+ return "auto+nio://localhost:61616";
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOPersistentSendAndReceiveTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOPersistentSendAndReceiveTest.java
index a18e4a5f54..defe191b31 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOPersistentSendAndReceiveTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/nio/AutoNIOPersistentSendAndReceiveTest.java
@@ -22,6 +22,6 @@ public class AutoNIOPersistentSendAndReceiveTest extends
NIOPersistentSendAndRec
@Override
protected String getBrokerURL() {
- return "auto+nio://localhost:0";
+ return "auto+nio://localhost:61616";
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
index 64c9de837b..3622eb6139 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
@@ -194,13 +194,19 @@ public class FailoverDurableSubTransactionTest {
consumer = session.createDurableSubscriber(destination, "DS");
AtomicBoolean success = new AtomicBoolean(false);
+ final int maxAttempts = 6;
+ final long deadline = System.currentTimeMillis() + 60_000;
+ int attempts = 0;
HashSet<Integer> dupCheck = new HashSet<Integer>();
while (!success.get()) {
+ attempts++;
+ assertTrue("Test exceeded max attempts", attempts <= maxAttempts);
+ assertTrue("Test exceeded time budget", System.currentTimeMillis()
< deadline);
dupCheck.clear();
int i = 0;
for (i = 0; i < messageCount; i++) {
- Message msg = consumer.receive(5000);
+ Message msg = consumer.receive(2000);
if (msg == null) {
LOG.info("Failed to receive on: " + i);
break;
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java
index b9e9fa59b8..84f955e2bb 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOJmsDurableTopicSendReceiveTest.java
@@ -39,21 +39,12 @@ public class NIOJmsDurableTopicSendReceiveTest extends
JmsDurableTopicSendReceiv
}
protected ActiveMQConnectionFactory createConnectionFactory() {
- // Use the actual bound URI instead of the bind URI with port 0
- String connectUrl = getBrokerURL();
- try {
- if (broker != null && !broker.getTransportConnectors().isEmpty()) {
- connectUrl =
broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
- } catch (Exception e) {
- // Fall back to bind URL if we can't get the actual URL
- }
- ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(connectUrl);
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(getBrokerURL());
return connectionFactory;
}
protected String getBrokerURL() {
- return "nio://localhost:0";
+ return "nio://localhost:61616";
}
protected BrokerService createBroker() throws Exception {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java
index 4464ee309c..0f1032cfe8 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOJmsSendAndReceiveTest.java
@@ -42,21 +42,12 @@ public class NIOJmsSendAndReceiveTest extends
JmsTopicSendReceiveWithTwoConnecti
}
protected ActiveMQConnectionFactory createConnectionFactory() {
- // Use the actual bound URI instead of the bind URI with port 0
- String connectUrl = getBrokerURL();
- try {
- if (broker != null && !broker.getTransportConnectors().isEmpty()) {
- connectUrl =
broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
- } catch (Exception e) {
- // Fall back to bind URL if we can't get the actual URL
- }
- ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(connectUrl);
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(getBrokerURL());
return connectionFactory;
}
protected String getBrokerURL() {
- return "nio://localhost:0";
+ return "nio://localhost:61616";
}
protected BrokerService createBroker() throws Exception {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
index d32537b129..9895f42a6c 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOSSLLoadTest.java
@@ -74,7 +74,7 @@ public class NIOSSLLoadTest {
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
- connector =
broker.addConnector("nio+ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
+ connector =
broker.addConnector("nio+ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=TLS_RSA_WITH_AES_256_CBC_SHA256");
broker.start();
broker.waitUntilStarted();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java
index 5706ee67a5..ea103873e3 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOTransportBrokerTest.java
@@ -23,7 +23,7 @@ import
org.apache.activemq.transport.TransportBrokerTestSupport;
public class NIOTransportBrokerTest extends TransportBrokerTestSupport {
protected String getBindLocation() {
- return "nio://localhost:0";
+ return "nio://localhost:61616";
}
public static Test suite() {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
index f746d58ff0..4046b39644 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
@@ -24,7 +24,6 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -174,7 +173,7 @@ public class TransportUriTest extends
EmbeddedBrokerTestSupport {
@Override
protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:0";
+ bindAddress = "tcp://localhost:61616";
super.setUp();
}
@@ -199,15 +198,6 @@ public class TransportUriTest extends
EmbeddedBrokerTestSupport {
return answer;
}
- @Override
- protected void startBroker() throws Exception {
- super.startBroker();
- if (broker != null && !broker.getTransportConnectors().isEmpty()) {
- TransportConnector connector =
broker.getTransportConnectors().get(0);
- bindAddress = connector.getConnectUri().toString();
- }
- }
-
public static Test suite() {
return suite(TransportUriTest.class);
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
index 04d72ebd30..9e63771919 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
@@ -85,7 +84,7 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
Thread.sleep(1000);
// consume messages
- ArrayList<Message> consumeList = consumeMessages("TestQ", 5,
TimeUnit.SECONDS.toMillis(30));
+ ArrayList<Message> consumeList = consumeMessages("TestQ");
LOG.info("Consumed list " + consumeList.size());
// compare lists
@@ -283,37 +282,6 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
}
}
- private ArrayList<Message> consumeMessages(String queueName, int
expectedCount, long timeoutMs) throws Exception {
- ArrayList<Message> returnedMessages = new ArrayList<Message>();
-
- ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
- Connection connection = connectionFactory.createConnection();
- try {
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(new
ActiveMQQueue(queueName));
- connection.start();
-
- long deadline = System.currentTimeMillis() + timeoutMs;
- while (returnedMessages.size() < expectedCount) {
- long remaining = deadline - System.currentTimeMillis();
- if (remaining <= 0) {
- break;
- }
- Message message = consumer.receive(Math.min(1000, remaining));
- if (message != null) {
- returnedMessages.add(message);
- }
- }
-
- consumer.close();
- return returnedMessages;
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
private Message consumeOneMessage(String queueName) throws Exception {
return consumeOneMessage(queueName, Session.AUTO_ACKNOWLEDGE);
}
diff --git a/pom.xml b/pom.xml
index e0590fe9a2..434662fdf1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,7 @@
<hamcrest-version>1.3</hamcrest-version>
<karaf-version>4.3.7</karaf-version>
<log4j-version>2.25.3</log4j-version>
- <mockito-version>5.21.0</mockito-version>
+ <mockito-version>4.8.1</mockito-version>
<owasp-dependency-check-version>12.1.8</owasp-dependency-check-version>
<mqtt-client-version>1.16</mqtt-client-version>
<org-apache-derby-version>10.16.1.1</org-apache-derby-version>
@@ -865,6 +865,12 @@
<version>${mockito-version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>${mockito-version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact