[
https://issues.apache.org/jira/browse/ARTEMIS-3110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17955261#comment-17955261
]
ASF subversion and git services commented on ARTEMIS-3110:
----------------------------------------------------------
Commit 781a4132ca1e6cc7ba0fc14c9ecef45df70afd44 in activemq-artemis's branch
refs/heads/main from Timothy Bish
[ https://gitbox.apache.org/repos/asf?p=activemq-artemis.git;h=781a4132ca ]
ARTEMIS-3110 Contract openwire credit window when pull times out
Ensure that outstanding credit is reduced when an openwire pull consumer
issues a pull command and no message is dispatched. Pull consumers should
only have an open credit window while the pull command window is open.
> PullMessage timeout is ignored
> ------------------------------
>
> Key: ARTEMIS-3110
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3110
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: OpenWire
> Affects Versions: 2.16.0
> Reporter: Svetlana Undalova
> Priority: Major
> Labels: pull-request-available
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> I ran into an issue trying to use Apache Artemis (2.16).
> I have slow consumers and went with prefetch size = 0 which according to docs
> results in pulling messages by the client. All works well if consumer starts
> after messages are already in the queue, but if it becomes idle after a while
> and manages to call consumer.Receive() N times then next time when messages
> appear in the queue I can see that all N are sent to the client (via checking
> UnconsumedMessageCount)
> I checked the same set up with ActiveMQ (5.16) and there both scenarios work
> correctly. I think that issue may happen because Artemis broker ignores
> PullMessage timeout.
> Here is my code
>
> {code:java}
> static async Task Main(string[] args)
> {
> var connectionfactory = new
> Apache.NMS.ActiveMQ.ConnectionFactory("tcp://localhost:61616?connection.PrefetchPolicy.queuePrefetch=0");
>
> var slowConsumer = ConsumerProcess("slow consumer",
> TimeSpan.FromSeconds(120), connectionfactory);
> var fastConsumer = ConsumerProcess("fast consumer",
> TimeSpan.FromSeconds(10), connectionfactory);
>
> // To make sure several pull requests are sent
> await Task.Delay(TimeSpan.FromSeconds(5));
>
> // Adding messages
> using (var connection = connectionfactory.CreateConnection("user",
> "password")) {
> connection.Start();
> using (var session = connection.CreateSession(acknowledgementMode:
> Apache.NMS.AcknowledgementMode.AutoAcknowledge))
> {
> var destination =
> Apache.NMS.Util.SessionUtil.GetDestination(session, "queue://cookies");
> using (var producer = session.CreateProducer(destination))
> {
> for (int i = 0; i < 10; i++)
> {
> var message = producer.CreateTextMessage();
> message.Text =
> $"{i}_{DateTimeOffset.Now}_{Guid.NewGuid()}";
> Console.WriteLine($"{DateTimeOffset.Now}\tSending
> message {message.Text}");
> producer.Send(message);
> }
> }
> }
> }
> Console.WriteLine("Press enter to exit");
> Console.ReadLine();
> }
> private static async Task ConsumerProcess(string name, TimeSpan
> processingTime, Apache.NMS.IConnectionFactory connectionfactory)
> {
> using (var connection = connectionfactory.CreateConnection("user",
> "password")) {
> connection.Start();
> using (var session = connection.CreateSession(acknowledgementMode:
> Apache.NMS.AcknowledgementMode.IndividualAcknowledge))
> {
> var destination =
> Apache.NMS.Util.SessionUtil.GetDestination(session, "queue://cookies");
> using (var consumer = session.CreateConsumer(destination))
> {
> while (true)
> {
> var message =
> consumer.Receive(TimeSpan.FromSeconds(1));
> if (message is Apache.NMS.ITextMessage textMessage)
>
> {
> Console.WriteLine($"{DateTimeOffset.Now}\t{name}
> received message - {textMessage.Text}");
> Console.WriteLine($"{DateTimeOffset.Now}\t{name}
> unconsumed messages after receive -
> {((Apache.NMS.ActiveMQ.MessageConsumer)consumer).UnconsumedMessageCount}");
> await Task.Delay(processingTime);
>
> message.Acknowledge();
> }
> else
> {
> await Task.Delay(TimeSpan.FromSeconds(1));
> }
> }
> }
> }
> }
> }
> {code}
>
>
> Here is output
> {code:java}
> 05.02.2021 16:04:55 +01:00 Sending message 5_05.02.2021 16:04:55
> +01:00_97a6857b-d003-4c63-ba11-16a6825e6595
> 05.02.2021 16:04:55 +01:00 Sending message 6_05.02.2021 16:04:55
> +01:00_4581b335-2483-42ac-a9c5-fe530a870ee1
> 05.02.2021 16:04:55 +01:00 Sending message 7_05.02.2021 16:04:55
> +01:00_2ad2c9d4-aab6-47be-b51c-a1faacd6a03c
> 05.02.2021 16:04:55 +01:00 Sending message 8_05.02.2021 16:04:55
> +01:00_866bca6b-9237-491b-981f-d73ae7c025fb
> 05.02.2021 16:04:55 +01:00 Sending message 9_05.02.2021 16:04:55
> +01:00_a60b926c-96b2-4989-8f5f-ec85e5cfb071
> Press enter to exit
> 05.02.2021 16:04:56 +01:00 slow consumer received message - 0_05.02.2021
> 16:04:55 +01:00_ba96ec80-3008-4c95-987c-5f82a1aaa239
> 05.02.2021 16:04:56 +01:00 slow consumer unconsumed messages after
> receive - 3
> 05.02.2021 16:05:06 +01:00 fast consumer received message - 3_05.02.2021
> 16:04:55 +01:00_34df251c-da75-4145-8da4-efeecea3318d
> 05.02.2021 16:05:06 +01:00 fast consumer unconsumed messages after
> receive - 2
> 05.02.2021 16:05:17 +01:00 fast consumer received message - 5_05.02.2021
> 16:04:55 +01:00_97a6857b-d003-4c63-ba11-16a6825e6595
> 05.02.2021 16:05:17 +01:00 fast consumer unconsumed messages after
> receive - 1
> 05.02.2021 16:05:28 +01:00 fast consumer received message - 7_05.02.2021
> 16:04:55 +01:00_2ad2c9d4-aab6-47be-b51c-a1faacd6a03c
> 05.02.2021 16:05:28 +01:00 fast consumer unconsumed messages after
> receive - 0
> 05.02.2021 16:05:39 +01:00 fast consumer received message - 8_05.02.2021
> 16:04:55 +01:00_866bca6b-9237-491b-981f-d73ae7c025fb
> 05.02.2021 16:05:39 +01:00 fast consumer unconsumed messages after
> receive - 0
> 05.02.2021 16:05:50 +01:00 fast consumer received message - 9_05.02.2021
> 16:04:55 +01:00_a60b926c-96b2-4989-8f5f-ec85e5cfb071
> 05.02.2021 16:05:50 +01:00 fast consumer unconsumed messages after
> receive - 0
> 05.02.2021 16:06:57 +01:00 slow consumer received message - 2_05.02.2021
> 16:04:55 +01:00_86962504-1b4b-4f2e-b359-585e535042e9
> 05.02.2021 16:06:57 +01:00 slow consumer unconsumed messages after
> receive - 2
> 05.02.2021 16:08:58 +01:00 slow consumer received message - 4_05.02.2021
> 16:04:55 +01:00_2d7ce61f-0240-4278-a779-936fd4f3a3e7
> 05.02.2021 16:08:58 +01:00 slow consumer unconsumed messages after
> receive - 1
> 05.02.2021 16:10:59 +01:00 slow consumer received message - 6_05.02.2021
> 16:04:55 +01:00_4581b335-2483-42ac-a9c5-fe530a870ee1
> 05.02.2021 16:10:59 +01:00 slow consumer unconsumed messages after
> receive - 0
>
> {code}
> As you may see consumers received more than 1 message (according to
> unconsumed message count which is amount of messages that were sent to the
> client). I was playing with it and usually amount of messages received are
> equals to amount of Receive() calls I made.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact