mattrpav commented on code in PR #1286:
URL: https://github.com/apache/activemq/pull/1286#discussion_r1731493459
##########
activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java:
##########
@@ -217,4 +230,177 @@ public void testMemoryLimit() throws Exception {
browser.close();
assertTrue("got at least maxPageSize", received >= maxPageSize);
}
+
+ @Test // https://issues.apache.org/jira/browse/AMQ-9554
+ public void testBrowseRedeliveryMaxTransacted() throws Exception {
+
+ IndividualDeadLetterStrategy individualDeadLetterStrategy = new
IndividualDeadLetterStrategy();
+ individualDeadLetterStrategy.setQueuePrefix("");
+ individualDeadLetterStrategy.setQueueSuffix(".dlq");
+ individualDeadLetterStrategy.setUseQueueForQueueMessages(true);
+
broker.getDestinationPolicy().getDefaultEntry().setDeadLetterStrategy(individualDeadLetterStrategy);
+
broker.getDestinationPolicy().getDefaultEntry().setPersistJMSRedelivered(true);
+
+ String messageId = null;
+
+ String queueName = "browse.redeliverd.tx";
+ String dlqQueueName = "browse.redeliverd.tx.dlq";
+ String dlqDlqQueueName = "browse.redeliverd.tx.dlq.dlq";
+
+ ActiveMQQueue queue = new ActiveMQQueue(queueName +
"?consumer.prefetchSize=0");
+ ActiveMQQueue queueDLQ = new ActiveMQQueue(dlqQueueName +
"?consumer.prefetchSize=0");
+ ActiveMQQueue queueDLQDLQ = new ActiveMQQueue(dlqDlqQueueName);
+
+ broker.getAdminView().addQueue(queueName);
+ broker.getAdminView().addQueue(dlqQueueName);
+
+ DestinationView dlqQueueView =
broker.getAdminView().getBroker().getQueueView(dlqQueueName);
+ DestinationView queueView =
broker.getAdminView().getBroker().getQueueView(queueName);
+
+ verifyQueueStats(0l, 0l, 0l, dlqQueueView);
+ verifyQueueStats(0l, 0l, 0l, queueView);
+
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(queue);
+
+ Message sendMessage = session.createTextMessage("Hello world!");
+ producer.send(sendMessage);
+ messageId = sendMessage.getJMSMessageID();
+ session.commit();
+ producer.close();
+
+ verifyQueueStats(0l, 0l, 0l, dlqQueueView);
+ verifyQueueStats(1l, 0l, 1l, queueView);
+
+ // Redeliver message to DLQ
+ Message message = null;
+ MessageConsumer consumer = session.createConsumer(queue);
+ int rollbackCount = 0;
+ do {
+ message = consumer.receive(2000l);
+ if(message != null) {
+ session.rollback();
+ rollbackCount++;
+ }
+ } while (message != null);
+
+ assertEquals(Integer.valueOf(7), Integer.valueOf(rollbackCount));
+ verifyQueueStats(1l, 0l, 1l, dlqQueueView);
+ verifyQueueStats(1l, 1l, 0l, queueView);
+
+ session.commit();
+ consumer.close();
+
+ // Increment redelivery counter on the message in the DLQ
+ // Close the consumer to force broker to dispatch
+ Message messageDLQ = null;
+ MessageConsumer consumerDLQ = session.createConsumer(queueDLQ);
+ int dlqRollbackCount = 0;
+ int dlqRollbackCountLimit = 5;
+ do {
+ messageDLQ = consumerDLQ.receive(2000l);
+ if(messageDLQ != null) {
+ session.rollback();
+ session.close();
+ consumerDLQ.close();
+ session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ consumerDLQ = session.createConsumer(queueDLQ);
+ dlqRollbackCount++;
+ }
+ } while (messageDLQ != null && dlqRollbackCount <
dlqRollbackCountLimit);
+ session.commit();
+ consumerDLQ.close();
+
+ // Browse in tx mode works when we are at the edge of maxRedeliveries
+ // aka browse does not increment redeliverCounter as expected
+ Queue brokerQueueDLQ = resolveQueue(broker, queueDLQ);
+
+ for(int i=0; i<16; i++) {
+ QueueBrowser browser = session.createBrowser(queueDLQ);
+ Enumeration<?> enumeration = browser.getEnumeration();
+ ActiveMQMessage activemqMessage = null;
+ int received = 0;
+ while (enumeration.hasMoreElements()) {
+ activemqMessage = (ActiveMQMessage)enumeration.nextElement();
+ received++;
+ }
+ browser.close();
+ assertEquals(Integer.valueOf(1), Integer.valueOf(received));
+ assertEquals(Integer.valueOf(6),
Integer.valueOf(activemqMessage.getRedeliveryCounter()));
+
+ // Confirm broker-side redeliveryCounter
+ QueueMessageReference queueMessageReference =
brokerQueueDLQ.getMessage(messageId);
+ assertEquals(Integer.valueOf(6),
Integer.valueOf(queueMessageReference.getRedeliveryCounter()));
+ }
+
+ session.close();
+ connection.close();
+
+ // Change redelivery max and the browser will fail
+ factory.getRedeliveryPolicy().setMaximumRedeliveries(3);
+ final Connection browseConnection = factory.createConnection();
+ browseConnection.start();
+
+ final AtomicInteger browseCounter = new AtomicInteger(0);
+ final AtomicInteger jmsExceptionCounter = new AtomicInteger(0);
+
+ final Session browseSession = browseConnection.createSession(true,
Session.SESSION_TRANSACTED);
+
+ Thread browseThread = new Thread() {
+ public void run() {
+
+ QueueBrowser browser = null;
+ try {
+ browser = browseSession.createBrowser(queueDLQ);
+ Enumeration<?> enumeration = browser.getEnumeration();
+ while (enumeration.hasMoreElements()) {
+ Message message = (Message)enumeration.nextElement();
Review Comment:
BUG: This blocks indefinitely
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact