daves created ARTEMIS-4047: ------------------------------ Summary: Artemis does not send message to consumer AMQP Key: ARTEMIS-4047 URL: https://issues.apache.org/jira/browse/ARTEMIS-4047 Project: ActiveMQ Artemis Issue Type: Bug Components: AMQP, Broker Affects Versions: 2.25.0 Reporter: daves Attachments: 1.PNG, 2.PNG, 3.PNG, 4.PNG, 5.PNG, All.zip
The broker does not send messages from one of many existing queues to the connected consumer. According to the UI the queue does contain ~15k messages. I’m not able to consume any of these messages. I also tried to read a message using the browse function of the UI/console but that does not work eighter. The message was created by a AMQP client and should be consumed by another AMQP client. I tried to capture the situation in a few screenshots… I don’t know which data can help you to understand the situation, so I’ve collected everything: * Logs * Broker * Data Please let me know if there are any other data I should add to the ticket. I don’t think that the code of my client is relevant since the problem only exist for a single queue…but here it is anyway: {code:java} using Amqp; using Amqp.Framing; using Amqp.Types; namespace Test; public sealed class MessageConsumer { private readonly String _address; private readonly CancellationToken _cancellationToken; private readonly String _consumerName; private readonly String[] _destinations; public MessageConsumer( String address, String consumerName, String[] destinations, CancellationToken cancellationToken ) { _address = address; _consumerName = consumerName; _destinations = destinations; _cancellationToken = cancellationToken; } public async Task StartReceivingMessages() { await Task.Yield(); while ( !_cancellationToken.IsCancellationRequested ) { var connectionFactory = new ConnectionFactory(); var address = new Address( _address ); try { var connection = await connectionFactory.CreateAsync( address ); var session = ( (IConnection) connection ).CreateSession(); var receivers = new List<IReceiverLink>(); foreach ( var destination in _destinations ) { var receiver = session.CreateReceiver( $"{_consumerName}_{destination}", new Source { Address = destination, Capabilities = new[] { new Symbol( "queue" ) } } ); receivers.Add( receiver ); } while ( !_cancellationToken.IsCancellationRequested ) foreach ( var receiver in receivers ) { // ReceiveAsync( TimeSpan.Zero ); blocks forever and no messages will be received var message = await receiver.ReceiveAsync( TimeSpan.FromMilliseconds( 1 ) ); if ( message == null ) continue; receiver.Accept( message ); Console.WriteLine( $"{_consumerName} - Received message with id: '{message.Properties.MessageId}'" ); } } catch ( Exception ex ) { Console.WriteLine( $"{_consumerName} - Connection error in producer '{_consumerName}' {ex.Message} => create new connection." ); await Task.Delay( 1000, CancellationToken.None ); } } } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)