HavretGC commented on a change in pull request #4: [WIP] Failover implementation
URL: https://github.com/apache/activemq-nms-amqp/pull/4#discussion_r302229852
##########
File path: src/NMS.AMQP/Util/PriorityMessageQueue.cs
##########
@@ -0,0 +1,114 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.AMQP.Message;
+
+namespace Apache.NMS.AMQP.Util
+{
+ public class PriorityMessageQueue
+ {
+ private readonly LinkedList<InboundMessageDispatch>[] lists;
+
+ private readonly object syncRoot = new object();
+
+ private int count;
+
+ public PriorityMessageQueue()
+ {
+ lists = new LinkedList<InboundMessageDispatch>[(int)
MsgPriority.Highest + 1];
+ for (int i = 0; i < lists.Length; i++)
+ {
+ lists[i] = new LinkedList<InboundMessageDispatch>();
+ }
+ }
+
+ public bool IsEmpty => Count == 0;
+
+ public int Count
+ {
+ get
+ {
+ lock (syncRoot)
+ return count;
+ }
+ }
+
+ public InboundMessageDispatch DequeueNoWait()
+ {
+ return Dequeue(0);
+ }
+
+ private InboundMessageDispatch RemoveFirst()
+ {
+ if (count > 0)
+ {
+ for (int i = (int) MsgPriority.Highest; i >= 0; i--)
+ {
+ LinkedList<InboundMessageDispatch> list = lists[i];
+ if (list.Count > 0)
+ {
+ count--;
+ InboundMessageDispatch envelope = list.First.Value;
+ list.RemoveFirst();
+ return envelope;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ public void Enqueue(InboundMessageDispatch envelope)
+ {
+ lock (syncRoot)
+ {
+ GetList(envelope).AddLast(envelope);
+ this.count++;
+ Monitor.Pulse(syncRoot);
+ }
+ }
+
+ public void EnqueueFirst(InboundMessageDispatch envelope)
+ {
+ lock (syncRoot)
+ {
+ lists[(int) MsgPriority.Highest].AddFirst(envelope);
+ count++;
+ Monitor.Pulse(syncRoot);
+ }
+ }
+
+ private LinkedList<InboundMessageDispatch>
GetList(InboundMessageDispatch envelope)
+ {
+ MsgPriority priority = envelope.Message.NMSPriority;
+ return lists[(int) priority];
+ }
+
+ public InboundMessageDispatch Dequeue(int timeout)
Review comment:
PriorityMessageQueue now notifies waiting thread when consumer is being
closed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services