jbonofre commented on code in PR #1621:
URL: https://github.com/apache/activemq/pull/1621#discussion_r2709467088
##########
activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java:
##########
@@ -129,7 +129,7 @@ public void run() {
executorService.awaitTermination(10, TimeUnit.SECONDS);
ArgumentCaptor<RemoveInfo> removeInfo =
ArgumentCaptor.forClass(RemoveInfo.class);
- Mockito.verify(transport,
times(4)).sendToActiveMQ(removeInfo.capture());
+ Mockito.verify(transport,
times(1)).sendToActiveMQ(removeInfo.capture());
Review Comment:
I'm a bit confused by the change from 4 to 1. One time is probably enough
(to speed up the tests). I wonder why we put 4 times initially.
##########
activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java:
##########
@@ -158,9 +159,32 @@ public void testClientAckWithoutAckId() throws Exception {
String frame = "ACK\n" + "message-id:" + ackId + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
+ // Unsubscribe immediately to prevent message redelivery while waiting
for ERROR
+ String unsubscribe = "UNSUBSCRIBE\n" + "id:1\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(unsubscribe);
+
+ // Receive frames until we get the ERROR frame, ignoring any MESSAGE
frames
+ // that arrive due to redelivery (especially relevant for SSL
transport)
+ StompFrame error = null;
+ for (int i = 0; i < 5; i++) {
+ error = stompConnection.receive();
Review Comment:
Same comment here about the `receive()` method.
##########
activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java:
##########
@@ -165,6 +165,22 @@ public boolean isSatisified() throws Exception {
transportConnector.start();
+ // Wait for failover to reconnect and recover() to succeed
+ // The ReconnectingXAResource should handle reconnection
transparently
Review Comment:
nit: maybe worth to mention that it's related to `maxReconnectDelay` on the
transport.
##########
activemq-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java:
##########
@@ -63,7 +63,10 @@ protected void tearDown() throws Exception {
if (broker != null) {
try {
broker.stop();
+ broker.waitUntilStopped();
Review Comment:
I guess the goal here is to avoid "conflict" between embedded brokers
between tests. just curious.
##########
activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java:
##########
@@ -165,6 +165,22 @@ public boolean isSatisified() throws Exception {
transportConnector.start();
+ // Wait for failover to reconnect and recover() to succeed
+ // The ReconnectingXAResource should handle reconnection
transparently
+ final XAResource resource = resources[0];
+ assertTrue("connection re-established and can recover",
Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ try {
+ resource.recover(100);
+ return true;
+ } catch (Exception e) {
+ // Still reconnecting
+ return false;
+ }
+ }
+ }, 30000, 500));
Review Comment:
nit: The values here are arbitrary right ? The only risk is that it could be
flaky on "slow" machines. It's reasonable though.
I would prefer to use `TimeUnit` here for clarify.
##########
activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java:
##########
@@ -565,13 +565,26 @@ public void testAckMessageWithNoId() throws Exception {
received.getHeaders().get("message-id") + "\n\n" +
Stomp.NULL;
stompConnection.sendFrame(ack);
- StompFrame error = stompConnection.receive();
- LOG.info("Received Frame: {}", error);
- assertTrue("Expected ERROR but got: " + error.getAction(),
error.getAction().equals("ERROR"));
-
+ // Unsubscribe immediately after invalid ACK to prevent message
redelivery
+ // while waiting for ERROR frame. This avoids race condition where
message
+ // could be redelivered before ERROR is received.
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" +
getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub);
+
+ // Receive frames until we get the ERROR frame, ignoring any MESSAGE
frames
+ // that arrive due to redelivery (especially relevant for SSL
transport)
+ StompFrame error = null;
+ for (int i = 0; i < 5; i++) {
+ error = stompConnection.receive();
Review Comment:
nit: `stompConnection.receive()` is a blocking method afair. We can pass a
timeout to the `receive()` method. I would define a timeout on `@Test` or
define a timeout here else we can have a hanging test if something's wrong.
##########
activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java:
##########
@@ -136,7 +136,7 @@ public void testXAResourceReconnect() throws Exception {
try {
final TransportConnector transportConnector =
brokerService.getTransportConnectors().get(0);
- String failoverUrl =
String.format("failover:(%s)?maxReconnectAttempts=1",
transportConnector.getConnectUri());
+ String failoverUrl =
String.format("failover:(%s)?maxReconnectAttempts=10&initialReconnectDelay=100",
transportConnector.getConnectUri());
Review Comment:
For the record, this initial reconnect delay could have been set at code
level (in the `waitFor()` method). As we are already using
`maxReconnectAttempts`, it's fine to use `initialReconnectDelay`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact