This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git

commit 20ef704a6def7119629a45d033a3605a57c42ac7
Author: Timothy Bish <[email protected]>
AuthorDate: Fri May 2 11:49:46 2025 -0400

    PROTON-2887 Remove some unused code from DeliveryQueue
    
    Remove some older unused code in the message passing delivery queue and
    clean up a bit.
---
 .../qpid/protonj2/client/util/DeliveryQueue.java   |  19 ---
 .../protonj2/client/util/FifoDeliveryQueue.java    | 153 +++++++--------------
 2 files changed, 52 insertions(+), 120 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/DeliveryQueue.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/DeliveryQueue.java
index 70dcb9d5..b5a551ca 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/DeliveryQueue.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/DeliveryQueue.java
@@ -33,14 +33,6 @@ public interface DeliveryQueue {
      */
     void enqueue(ClientDelivery delivery);
 
-    /**
-     * Adds the given {@link Delivery} to the front of the queue.
-     *
-     * @param delivery
-     *        The in-bound Delivery to enqueue.
-     */
-    void enqueueFirst(ClientDelivery delivery);
-
     /**
      * Used to get an {@link Delivery}. The amount of time this method blocks 
is based on the timeout value
      * that is supplied to it.
@@ -89,22 +81,11 @@ public interface DeliveryQueue {
      */
     void stop();
 
-    /**
-     * Closes the Delivery Queue.  No Delivery can be added or removed from 
the Queue
-     * once it has entered the closed state.
-     */
-    void close();
-
     /**
      * @return true if the Queue is not in the stopped or closed state.
      */
     boolean isRunning();
 
-    /**
-     * @return true if the Queue has been closed.
-     */
-    boolean isClosed();
-
     /**
      * @return true if there are no deliveries in the queue.
      */
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
index 2bbb8f40..a68ad9a7 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
@@ -18,7 +18,6 @@ package org.apache.qpid.protonj2.client.util;
 
 import java.util.ArrayDeque;
 import java.util.Deque;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.qpid.protonj2.client.Delivery;
 import org.apache.qpid.protonj2.client.impl.ClientDelivery;
@@ -28,19 +27,11 @@ import org.apache.qpid.protonj2.client.impl.ClientDelivery;
  */
 public final class FifoDeliveryQueue implements DeliveryQueue {
 
-    private static final AtomicIntegerFieldUpdater<FifoDeliveryQueue> 
STATE_FIELD_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(FifoDeliveryQueue.class, 
"state");
-
-    private static final int CLOSED = 0;
-    private static final int STOPPED = 1;
-    private static final int RUNNING = 2;
-
-    private volatile int state = STOPPED;
+    private final Deque<ClientDelivery> queue;
 
+    private volatile boolean started;
     private int waiters = 0;
 
-    private final Deque<ClientDelivery> queue;
-
     /**
      * Creates a new first in / first out message queue with the given queue 
depth
      *
@@ -52,136 +43,96 @@ public final class FifoDeliveryQueue implements 
DeliveryQueue {
     }
 
     @Override
-    public void enqueueFirst(ClientDelivery envelope) {
-        synchronized (queue) {
-            queue.addFirst(envelope);
-            if (waiters > 0) {
-                queue.notify();
-            }
-        }
-    }
-
-    @Override
-    public void enqueue(ClientDelivery envelope) {
-        synchronized (queue) {
-            queue.addLast(envelope);
-            if (waiters > 0) {
-                queue.notify();
-            }
+    public synchronized void enqueue(ClientDelivery envelope) {
+        queue.addLast(envelope);
+        if (waiters > 0) {
+            notify();
         }
     }
 
     @Override
-    public ClientDelivery dequeue(long timeout) throws InterruptedException {
-        synchronized (queue) {
-            // Wait until the receiver is ready to deliver messages.
-            while (timeout != 0 && isRunning() && queue.isEmpty()) {
-                if (timeout == -1) {
-                    waiters++;
-                    try {
-                        queue.wait();
-                    } finally {
-                        waiters--;
-                    }
-                } else {
-                    long start = System.currentTimeMillis();
-                    waiters++;
-                    try {
-                        queue.wait(timeout);
-                    } finally {
-                        waiters--;
-                    }
-                    timeout = Math.max(timeout + start - 
System.currentTimeMillis(), 0);
+    public synchronized ClientDelivery dequeue(long timeout) throws 
InterruptedException {
+        // Wait until the receiver is ready to deliver messages.
+        while (queue.isEmpty() && timeout != 0 && started) {
+            if (timeout == -1) {
+                waiters++;
+                try {
+                    wait();
+                } finally {
+                    waiters--;
                 }
+            } else {
+                long start = System.currentTimeMillis();
+                waiters++;
+                try {
+                    wait(timeout);
+                } finally {
+                    waiters--;
+                }
+                timeout = Math.max(timeout + start - 
System.currentTimeMillis(), 0);
             }
+        }
 
-            if (!isRunning()) {
-                return null;
-            }
-
+        if (started) {
             return queue.pollFirst();
+        } else {
+            return null;
         }
     }
 
     @Override
-    public ClientDelivery dequeueNoWait() {
-        synchronized (queue) {
-            if (!isRunning()) {
-                return null;
-            }
-
+    public synchronized ClientDelivery dequeueNoWait() {
+        if (started) {
             return queue.pollFirst();
+        } else {
+            return null;
         }
     }
 
     @Override
-    public void start() {
-        if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) {
-            synchronized (queue) {
-                if (waiters > 0) {
-                    queue.notifyAll();
-                }
-            }
-        }
-    }
+    public synchronized void start() {
+        if (!started) {
+            started = true;
 
-    @Override
-    public void stop() {
-        if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) {
-            synchronized (queue) {
-                if (waiters > 0) {
-                    queue.notifyAll();
-                }
+            if (waiters > 0) {
+                notifyAll();
             }
         }
     }
 
     @Override
-    public void close() {
-        if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) {
-            synchronized (queue) {
-                if (waiters > 0) {
-                    queue.notifyAll();
-                }
+    public synchronized void stop() {
+        if (started) {
+            started = false;
+
+            if (waiters > 0) {
+                notifyAll();
             }
         }
     }
 
     @Override
     public boolean isRunning() {
-        return state == RUNNING;
+        return started;
     }
 
     @Override
-    public boolean isClosed() {
-        return state == CLOSED;
+    public synchronized boolean isEmpty() {
+        return queue.isEmpty();
     }
 
     @Override
-    public boolean isEmpty() {
-        synchronized (queue) {
-            return queue.isEmpty();
-        }
+    public synchronized int size() {
+        return queue.size();
     }
 
     @Override
-    public int size() {
-        synchronized (queue) {
-            return queue.size();
-        }
+    public synchronized void clear() {
+        queue.clear();
     }
 
     @Override
-    public void clear() {
-        synchronized (queue) {
-            queue.clear();
-        }
-    }
-
-    @Override
-    public String toString() {
-        synchronized (queue) {
-            return queue.toString();
-        }
+    public synchronized String toString() {
+        return queue.toString();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to