[ 
https://issues.apache.org/jira/browse/ARTEMIS-5717?focusedWorklogId=988456&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-988456
 ]

ASF GitHub Bot logged work on ARTEMIS-5717:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Oct/25 18:27
            Start Date: 22/Oct/25 18:27
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #6000:
URL: https://github.com/apache/activemq-artemis/pull/6000#discussion_r2452989230


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRoutingTypeMismatchTest.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBridgeAddressPolicyElement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBridgeBrokerConnectionElement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBridgeQueuePolicyElement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPReceiverBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LINK_ATTACH_TIMEOUT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LINK_RECOVERY_INITIAL_DELAY;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AMQPRoutingTypeMismatchTest extends AmqpTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   protected static final int AMQP_PORT_2 = 5673;
+   public static final int TIME_BEFORE_RESTART = 1000;
+
+   @Test
+   public void testMismatchOnReceiver() throws Exception {
+
+      ActiveMQServer server = createServer(AMQP_PORT, false);
+      server.getConfiguration().getAddressSettings().clear();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setDeadLetterAddress(SimpleString.of("topic.DLQ")).setRedeliveryDelay(0).setMaxDeliveryAttempts(1));
+      
server.getConfiguration().addQueueConfiguration(QueueConfiguration.of("topic.DLQ").setRoutingType(RoutingType.ANYCAST));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(getName()).addRoutingType(RoutingType.MULTICAST));
+      server.setIdentity("Server1");
+      server.start();
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT);
+
+      long nmessages = 10;
+
+      try (Connection connection = factory.createConnection()) {
+         connection.setClientID("myID");
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageConsumer consumer = 
session.createDurableConsumer(session.createTopic(getName()), "myTopic");
+         MessageProducer producer = 
session.createProducer(session.createTopic(getName()));
+         for (int i = 0; i < nmessages; i++) {
+            producer.send(session.createTextMessage("hello"));
+         }
+         session.commit();
+         connection.start();
+         for (int i = 0; i < nmessages; i++) {
+            assertNotNull(consumer.receive(5000));
+         }
+         session.rollback();
+         assertNull(consumer.receive(100));
+      }
+
+      Queue dlq = server.locateQueue("topic.DLQ");
+      Wait.assertEquals(nmessages, dlq::getMessageCount, 5000, 100);
+
+      ActiveMQServer server2 = createServer(AMQP_PORT_2, false);
+      server2.getConfiguration().getAddressSettings().clear();
+      server2.getConfiguration().addAddressSetting("#", new 
AddressSettings().setDeadLetterAddress(SimpleString.of("topic.DLQ")).setRedeliveryDelay(0).setMaxDeliveryAttempts(1));
+      
server2.getConfiguration().addQueueConfiguration(QueueConfiguration.of("topic.DLQ").setRoutingType(RoutingType.ANYCAST));
+      server2.setIdentity("Server2");
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
+      amqpConnection.addReceiver((AMQPReceiverBrokerConnectionElement) new 
AMQPReceiverBrokerConnectionElement().setMatchAddress("#.DLQ"));
+      server2.getConfiguration().addAMQPConnection(amqpConnection);
+      server2.start();
+
+      Queue dlqServer2 = server2.locateQueue("topic.DLQ");
+      Wait.assertEquals(nmessages, dlqServer2::getMessageCount, 5000, 100);
+   }
+
+
+   @Test
+   public void testMismatchOnBridge() throws Exception {
+
+      ActiveMQServer server = createServer(AMQP_PORT, false);
+      server.getConfiguration().getAddressSettings().clear();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setDeadLetterAddress(SimpleString.of("topic.DLQ")).setRedeliveryDelay(0).setMaxDeliveryAttempts(1));
+      
server.getConfiguration().addQueueConfiguration(QueueConfiguration.of("topic.DLQ").setRoutingType(RoutingType.ANYCAST));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(getName()).addRoutingType(RoutingType.MULTICAST));
+      server.setIdentity("Server1");
+      server.start();
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT);
+
+      long nmessages = 10;
+
+      try (Connection connection = factory.createConnection()) {
+         connection.setClientID("myID");
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageConsumer consumer = 
session.createDurableConsumer(session.createTopic(getName()), "myTopic");
+         MessageProducer producer = 
session.createProducer(session.createTopic(getName()));
+         for (int i = 0; i < nmessages; i++) {
+            producer.send(session.createTextMessage("hello"));
+         }
+         session.commit();
+         connection.start();
+         for (int i = 0; i < nmessages; i++) {
+            assertNotNull(consumer.receive(5000));
+         }
+         session.rollback();
+         assertNull(consumer.receive(100));
+      }
+
+      Queue dlq = server.locateQueue("topic.DLQ");
+      Wait.assertEquals(nmessages, dlq::getMessageCount, 5000, 100);
+
+      ActiveMQServer server2 = createServer(AMQP_PORT_2, false);
+      server2.getConfiguration().getAddressSettings().clear();
+      server2.getConfiguration().addAddressSetting("#", new 
AddressSettings().setDeadLetterAddress(SimpleString.of("topic.DLQ")).setRedeliveryDelay(0).setMaxDeliveryAttempts(1));
+      
server2.getConfiguration().addQueueConfiguration(QueueConfiguration.of("topic.DLQ").setRoutingType(RoutingType.ANYCAST));
+      server2.setIdentity("Server2");
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
+      amqpConnection.addBridge((AMQPBridgeBrokerConnectionElement) new 
AMQPBridgeBrokerConnectionElement().addBridgeFromAddressPolicy(new 
AMQPBridgeAddressPolicyElement().setRemoteAddress("topic.DLQ").setName("topic.DLQ")).addBridgeToQueuePolicy(new
 
AMQPBridgeQueuePolicyElement().setRemoteAddress("topic.DLQ").setName("topic.DLQ")).setMatchAddress("topic.DLQ"));

Review Comment:
   @tabish121 the issue is that message.getRoutingType() is not matching to the 
sesssion.send routing Type.
   I wanted to check if this is an issue with Bridge as well...  I am not sure 
I am configuring this properly, or if I actually found an issue.
   
   I tried this in multiple ways... I was wondering if you could help here on 
what I'm doing wrong and how I should configure this Bridge (to pull messages 
from the Remote DLQ towards the local DLQ).





Issue Time Tracking
-------------------

    Worklog Id:     (was: 988456)
    Time Spent: 20m  (was: 10m)

> BrokerConnection Receiver would lose messages on routing type mismatch
> ----------------------------------------------------------------------
>
>                 Key: ARTEMIS-5717
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-5717
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.44.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to