This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 54bd735f88 QPID-8664: [Broker-J] Guava removal (1/10)
54bd735f88 is described below

commit 54bd735f88db0ec36135a939dccb0216bd9dd317
Author: dakirily <daniel.kiril...@gmail.com>
AuthorDate: Thu Mar 27 08:27:37 2025 +0100

    QPID-8664: [Broker-J] Guava removal (1/10)
    
    This commit replaces guava ListenableFuture with JDK CompletableFuture in 
Broker-J transaction mechanisms
---
 .../store/berkeleydb/AbstractBDBMessageStore.java  |  9 +++---
 .../store/berkeleydb/CoalescingCommiter.java       | 24 ++++++---------
 .../qpid/server/store/berkeleydb/Committer.java    |  6 ++--
 .../server/store/berkeleydb/EnvironmentFacade.java |  6 ++--
 .../berkeleydb/StandardEnvironmentFacade.java      |  5 ++--
 .../replication/ReplicatedEnvironmentFacade.java   |  5 ++--
 .../store/berkeleydb/CoalescingCommitterTest.java  |  7 ++---
 .../qpid/server/store/MemoryMessageStore.java      |  5 ++--
 .../org/apache/qpid/server/store/Transaction.java  |  4 +--
 .../server/txn/AsyncAutoCommitTransaction.java     | 35 ++++++++++------------
 .../org/apache/qpid/server/txn/AsyncCommand.java   |  7 ++---
 .../apache/qpid/server/txn/LocalTransaction.java   |  4 +--
 .../server/txn/AsyncAutoCommitTransactionTest.java |  6 ++--
 .../qpid/server/txn/MockStoreTransaction.java      |  8 ++---
 .../qpid/server/protocol/v0_10/ServerSession.java  |  4 +--
 .../protocol/v0_10/ServerSessionDelegate.java      |  6 ++--
 .../qpid/server/protocol/v0_8/AMQChannel.java      |  7 ++---
 .../server/protocol/v1_0/SendingLinkEndpoint.java  |  4 +--
 .../v1_0/StandardReceivingLinkEndpoint.java        |  4 +--
 .../store/jdbc/AbstractJDBCMessageStore.java       | 14 ++++-----
 .../jdbc/GenericAbstractJDBCMessageStore.java      |  6 ++--
 .../server/store/jdbc/JDBCMessageStoreTest.java    |  7 ++---
 22 files changed, 86 insertions(+), 97 deletions(-)

diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 1fca731aca..e5de3e0fb8 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -815,14 +816,14 @@ public abstract class AbstractBDBMessageStore implements 
MessageStore
 
     }
 
-    private <X> ListenableFuture<X> commitTranAsyncImpl(final Transaction tx, 
X val) throws StoreException
+    private <X> CompletableFuture<X> commitTranAsyncImpl(final Transaction tx, 
X val) throws StoreException
     {
         if (tx == null)
         {
             throw new StoreException("Fatal internal error: transactional is 
null at commitTran");
         }
 
-        ListenableFuture<X> result = getEnvironmentFacade().commitAsync(tx, 
val);
+        CompletableFuture<X> result = getEnvironmentFacade().commitAsync(tx, 
val);
 
         getLogger().debug("commitTranAsynImpl completed transaction {}", tx);
 
@@ -1414,12 +1415,12 @@ public abstract class AbstractBDBMessageStore 
implements MessageStore
         }
 
         @Override
-        public <X> ListenableFuture<X> commitTranAsync(final X val) throws 
StoreException
+        public <X> CompletableFuture<X> commitTranAsync(final X val) throws 
StoreException
         {
             checkMessageStoreOpen();
             doPreCommitActions();
             
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
-            ListenableFuture<X> futureResult = 
AbstractBDBMessageStore.this.commitTranAsyncImpl(_txn, val);
+            CompletableFuture<X> futureResult = 
AbstractBDBMessageStore.this.commitTranAsyncImpl(_txn, val);
             doPostCommitActions();
             return futureResult;
         }
diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index 151c01e159..9ab0d3958f 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
@@ -82,7 +83,7 @@ public class CoalescingCommiter implements Committer
     }
 
     @Override
-    public <X> ListenableFuture<X> commitAsync(Transaction tx, X val)
+    public <X> CompletableFuture<X> commitAsync(Transaction tx, X val)
     {
         ThreadNotifyingSettableFuture<X> future = new 
ThreadNotifyingSettableFuture<>();
         BDBCommitFutureResult<X> commitFuture = new 
BDBCommitFutureResult<>(val, future);
@@ -106,13 +107,13 @@ public class CoalescingCommiter implements Committer
         @Override
         public void complete()
         {
-            _future.set(_value);
+            _future.complete(_value);
         }
 
         @Override
         public void abort(RuntimeException databaseException)
         {
-            _future.setException(databaseException);
+            _future.completeExceptionally(databaseException);
         }
     }
 
@@ -295,7 +296,7 @@ public class CoalescingCommiter implements Committer
         }
     }
 
-    private final class ThreadNotifyingSettableFuture<X> extends 
AbstractFuture<X>
+    private final class ThreadNotifyingSettableFuture<X> extends 
CompletableFuture<X>
     {
         @Override
         public X get(final long timeout, final TimeUnit unit)
@@ -319,22 +320,15 @@ public class CoalescingCommiter implements Committer
         }
 
         @Override
-        protected boolean set(final X value)
+        public boolean complete(final X value)
         {
-            return super.set(value);
+            return super.complete(value);
         }
 
         @Override
-        protected boolean setException(final Throwable throwable)
+        public boolean completeExceptionally(final Throwable throwable)
         {
-            return super.setException(throwable);
-        }
-
-        @Override
-        public void addListener(final Runnable listener, final Executor exec)
-        {
-            super.addListener(listener, exec);
-            _commitThread.explicitNotify();
+            return super.completeExceptionally(throwable);
         }
     }
 
diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
index 36620ee42e..1eb37b1735 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
@@ -20,7 +20,8 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.CompletableFuture;
+
 import com.sleepycat.je.Transaction;
 
 public interface Committer
@@ -28,7 +29,8 @@ public interface Committer
     void start();
 
     void commit(Transaction tx, boolean syncCommit);
-    <X> ListenableFuture<X> commitAsync(Transaction tx, X val);
+
+    <X> CompletableFuture<X> commitAsync(Transaction tx, X val);
 
     void stop();
 }
diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index 5d3b8c1eca..4aa54b2b80 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -21,8 +21,8 @@
 package org.apache.qpid.server.store.berkeleydb;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import com.sleepycat.je.CacheMode;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
@@ -62,11 +62,13 @@ public interface EnvironmentFacade
     Transaction beginTransaction(TransactionConfig transactionConfig);
 
     void commit(Transaction tx);
-    <X> ListenableFuture<X> commitAsync(Transaction tx, X val);
+
+    <X> CompletableFuture<X> commitAsync(Transaction tx, X val);
 
     RuntimeException handleDatabaseException(String contextMessage, 
RuntimeException e);
 
     void closeDatabase(String name);
+
     void close();
 
     long getTotalLogSize();
diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 59070a10bd..12969e78c5 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -27,12 +27,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 import com.sleepycat.je.CheckpointConfig;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
@@ -207,7 +206,7 @@ public class StandardEnvironmentFacade implements 
EnvironmentFacade
     }
 
     @Override
-    public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X 
val)
+    public <X> CompletableFuture<X> commitAsync(final Transaction tx, final X 
val)
     {
         try
         {
diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index d0193cea21..e08b0d2d01 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -36,6 +36,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -422,7 +423,7 @@ public class ReplicatedEnvironmentFacade implements 
EnvironmentFacade, StateChan
     }
 
     @Override
-    public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X 
val)
+    public <X> CompletableFuture<X> commitAsync(final Transaction tx, final X 
val)
     {
         commitInternal(tx, _realMessageStoreDurability);
 
@@ -431,7 +432,7 @@ public class ReplicatedEnvironmentFacade implements 
EnvironmentFacade, StateChan
         {
             return _coalescingCommiter.commitAsync(tx, val);
         }
-        return Futures.immediateFuture(val);
+        return CompletableFuture.completedFuture(val);
     }
 
     @Override
diff --git 
a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
 
b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
index a79cbbea5b..6b6c702abb 100644
--- 
a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
+++ 
b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
@@ -30,11 +30,10 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -100,7 +99,7 @@ public class CoalescingCommitterTest extends UnitTestBase
 
         try
         {
-            ListenableFuture<?> future =  
_coalescingCommitter.commitAsync(null, null);
+            CompletableFuture<?> future =  
_coalescingCommitter.commitAsync(null, null);
             future.get(1000, TimeUnit.MILLISECONDS);
             fail("Async commit should fail");
         }
@@ -113,7 +112,7 @@ public class CoalescingCommitterTest extends UnitTestBase
 
         doNothing().when(_environmentFacade).flushLog();
         final String expectedResult = "Test";
-        ListenableFuture<?> future =  _coalescingCommitter.commitAsync(null, 
expectedResult);
+        CompletableFuture<?> future =  _coalescingCommitter.commitAsync(null, 
expectedResult);
         Object result = future.get(1000, TimeUnit.MILLISECONDS);
         assertEquals(expectedResult, result, "Unexpected result");
 
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
 
b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index af00c11e24..bb13494c07 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -67,10 +68,10 @@ public class MemoryMessageStore implements MessageStore
         private final Set<Xid> _localDistributedTransactionsRemoves = new 
HashSet<>();
 
         @Override
-        public <X> ListenableFuture<X> commitTranAsync(final X val)
+        public <X> CompletableFuture<X> commitTranAsync(final X val)
         {
             commitTran();
-            return Futures.immediateFuture(val);
+            return CompletableFuture.completedFuture(val);
         }
 
         @Override
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java 
b/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
index 190357a1db..0264fa204c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.server.store;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
 
@@ -49,7 +49,7 @@ public interface Transaction
      *
      * @param val
      */
-    <X> ListenableFuture<X> commitTranAsync(final X val);
+    <X> CompletableFuture<X> commitTranAsync(final X val);
 
     /**
      * Abandons all operations performed within a given transactional context.
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
 
b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 76eaef7617..5445c18b97 100755
--- 
a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -21,9 +21,8 @@
 package org.apache.qpid.server.txn;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,8 +55,7 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
 
     public interface FutureRecorder
     {
-        void recordFuture(ListenableFuture<Void> future, Action action);
-
+        void recordFuture(CompletableFuture<Void> future, Action action);
     }
 
     public AsyncAutoCommitTransaction(MessageStore transactionLog, 
FutureRecorder recorder)
@@ -85,7 +83,7 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
     @Override
     public void addPostTransactionAction(final Action immediateAction)
     {
-        addFuture(Futures.<Void>immediateFuture(null), immediateAction);
+        addFuture(CompletableFuture.completedFuture(null), immediateAction);
 
     }
 
@@ -95,7 +93,7 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
         Transaction txn = null;
         try
         {
-            ListenableFuture<Void> future;
+            CompletableFuture<Void> future;
             if(record != null)
             {
                 LOGGER.debug("Dequeue of message number {} from transaction 
log. Queue : {}", record.getMessageNumber(), record.getQueueId());
@@ -108,7 +106,7 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
             }
             else
             {
-                future = Futures.immediateFuture(null);
+                future = CompletableFuture.completedFuture(null);
             }
             addFuture(future, postTransactionAction);
             postTransactionAction = null;
@@ -120,8 +118,7 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
 
     }
 
-
-    private void addFuture(final ListenableFuture<Void> future, final Action 
action)
+    private void addFuture(final CompletableFuture<Void> future, final Action 
action)
     {
         if(action != null)
         {
@@ -136,7 +133,7 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
         }
     }
 
-    private void addEnqueueFuture(final ListenableFuture<Void> future, final 
Action action, boolean persistent)
+    private void addEnqueueFuture(final CompletableFuture<Void> future, final 
Action action, boolean persistent)
     {
         if(action != null)
         {
@@ -176,15 +173,15 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
                 }
 
             }
-            ListenableFuture<Void> future;
+            CompletableFuture<Void> future;
             if(txn != null)
             {
-                future = txn.commitTranAsync((Void) null);
+                future = txn.commitTranAsync(null);
                 txn = null;
             }
             else
             {
-                future = Futures.immediateFuture(null);
+                future = CompletableFuture.completedFuture(null);
             }
             addFuture(future, postTransactionAction);
             postTransactionAction = null;
@@ -203,7 +200,7 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
         Transaction txn = null;
         try
         {
-            ListenableFuture<Void> future;
+            CompletableFuture<Void> future;
             final MessageEnqueueRecord enqueueRecord;
             if(queue.getMessageDurability().persist(message.isPersistent()))
             {
@@ -216,7 +213,7 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
             }
             else
             {
-                future = Futures.immediateFuture(null);
+                future = CompletableFuture.completedFuture(null);
                 enqueueRecord = null;
             }
             final EnqueueAction underlying = postTransactionAction;
@@ -287,15 +284,15 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
                 i++;
             }
 
-            ListenableFuture<Void> future;
+            CompletableFuture<Void> future;
             if (txn != null)
             {
-                future = txn.commitTranAsync((Void) null);
+                future = txn.commitTranAsync(null);
                 txn = null;
             }
             else
             {
-                future = Futures.immediateFuture(null);
+                future = CompletableFuture.completedFuture(null);
             }
             final EnqueueAction underlying = postTransactionAction;
             addEnqueueFuture(future, new Action()
@@ -350,7 +347,7 @@ public class AsyncAutoCommitTransaction implements 
ServerTransaction
     {
         if(immediatePostTransactionAction != null)
         {
-            addFuture(Futures.<Void>immediateFuture(null), new Action()
+            addFuture(CompletableFuture.completedFuture(null), new Action()
             {
                 @Override
                 public void postCommit()
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java 
b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java
index 29ce605360..2ce84ada30 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java
@@ -20,18 +20,17 @@
 
 package org.apache.qpid.server.txn;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class AsyncCommand
 {
-    private final ListenableFuture<Void> _future;
+    private final CompletableFuture<Void> _future;
     private ServerTransaction.Action _action;
 
-    public AsyncCommand(final ListenableFuture<Void> future, final 
ServerTransaction.Action action)
+    public AsyncCommand(final CompletableFuture<Void> future, final 
ServerTransaction.Action action)
     {
         _future = future;
         _action = action;
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java 
b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 3fac630d7f..e9c5d973bb 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -26,11 +26,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +74,7 @@ public class LocalTransaction implements ServerTransaction
     private final MessageStore _transactionLog;
     private volatile long _txnStartTime = 0L;
     private volatile long _txnUpdateTime = 0L;
-    private ListenableFuture<Runnable> _asyncTran;
+    private CompletableFuture<Runnable> _asyncTran;
     private volatile boolean _outstandingWork;
     private final LocalTransactionState _finalState;
     private final Set<LocalTransactionListener> _localTransactionListeners = 
new CopyOnWriteArraySet<>();
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
index 93e46673bb..2318074273 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
@@ -26,8 +26,8 @@ import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -50,7 +50,7 @@ public class AsyncAutoCommitTransactionTest extends 
UnitTestBase
     private final BaseQueue _queue = mock(BaseQueue.class);
     private final MessageStore _messageStore = mock(MessageStore.class);
     private final ServerTransaction.EnqueueAction _postTransactionAction = 
mock(ServerTransaction.EnqueueAction.class);
-    private final ListenableFuture<Void> _future = 
mock(ListenableFuture.class);
+    private final CompletableFuture<Void> _future = 
mock(CompletableFuture.class);
 
     private Transaction _storeTransaction;
     private FutureRecorder _futureRecorder;
@@ -149,7 +149,7 @@ public class AsyncAutoCommitTransactionTest extends 
UnitTestBase
         asyncAutoCommitTransaction.enqueue(_queue, _message, 
_postTransactionAction);
 
         verifyNoInteractions(_storeTransaction);
-        verify(_futureRecorder).recordFuture(any(ListenableFuture.class), 
any(Action.class));
+        verify(_futureRecorder).recordFuture(any(CompletableFuture.class), 
any(Action.class));
         verifyNoInteractions(_postTransactionAction);
     }
 }
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
 
b/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index fd21135b4e..0843f0cecc 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -21,9 +21,7 @@
 package org.apache.qpid.server.txn;
 
 import java.util.UUID;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -106,9 +104,9 @@ class MockStoreTransaction implements Transaction
     }
 
     @Override
-    public <X> ListenableFuture<X> commitTranAsync(final X val)
+    public <X> CompletableFuture<X> commitTranAsync(final X val)
     {
-        return Futures.immediateFuture(val);
+        return CompletableFuture.completedFuture(val);
     }
 
     @Override
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index e77738b5ca..c31510c25d 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -52,6 +52,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -62,7 +63,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1610,7 +1610,7 @@ public class ServerSession extends SessionInvoker
     }
 
     @Override
-    public void recordFuture(final ListenableFuture<Void> future, final 
ServerTransaction.Action action)
+    public void recordFuture(final CompletableFuture<Void> future, final 
ServerTransaction.Action action)
     {
         _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
     }
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index ab264e85a2..f57ec2fd26 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -32,10 +32,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -150,7 +150,7 @@ public class ServerSessionDelegate extends 
MethodDelegate<ServerSession> impleme
         serverSession.accept(method.getTransfers());
         if(!serverSession.isTransactional())
         {
-            serverSession.recordFuture(Futures.<Void>immediateFuture(null),
+            serverSession.recordFuture(CompletableFuture.completedFuture(null),
                                        new 
CommandProcessedAction(serverSession, method));
         }
     }
@@ -537,7 +537,7 @@ public class ServerSessionDelegate extends 
MethodDelegate<ServerSession> impleme
                     }
                     else
                     {
-                        ssn.recordFuture(Futures.immediateFuture(null),
+                        
ssn.recordFuture(CompletableFuture.completedFuture(null),
                                          new CommandProcessedAction(ssn, xfr));
                     }
                 }
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 4bb581413f..877ae574f9 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -46,8 +47,6 @@ import javax.security.auth.Subject;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -459,7 +458,7 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                         {
                             if (_confirmOnPublish)
                             {
-                                recordFuture(Futures.immediateFuture(null),
+                                
recordFuture(CompletableFuture.completedFuture(null),
                                              new ServerTransaction.Action()
                                              {
                                                  private final long 
_deliveryTag = _confirmedMessageCounter;
@@ -1585,7 +1584,7 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
     }
 
     @Override
-    public void recordFuture(final ListenableFuture<Void> future, final 
ServerTransaction.Action action)
+    public void recordFuture(final CompletableFuture<Void> future, final 
ServerTransaction.Action action)
     {
         _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
     }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 5081d14363..f5080a3bc6 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -29,11 +29,11 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.regex.Pattern;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -935,7 +935,7 @@ public class SendingLinkEndpoint extends 
AbstractLinkEndpoint<Source, Target>
     }
 
     @Override
-    public void recordFuture(final ListenableFuture<Void> future, final 
ServerTransaction.Action action)
+    public void recordFuture(final CompletableFuture<Void> future, final 
ServerTransaction.Action action)
     {
         _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
     }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 317f08c5c6..1ceb1dd7e1 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -32,9 +32,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -623,7 +623,7 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
     }
 
     @Override
-    public void recordFuture(final ListenableFuture<Void> future, final 
ServerTransaction.Action action)
+    public void recordFuture(final CompletableFuture<Void> future, final 
ServerTransaction.Action action)
     {
         _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
     }
diff --git 
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
 
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index cc4b7ec079..060f3f3c98 100644
--- 
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ 
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -51,7 +52,6 @@ import java.util.stream.Collectors;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -929,19 +929,19 @@ public abstract class AbstractJDBCMessageStore implements 
MessageStore
         }
     }
 
-    private <X> ListenableFuture<X> commitTranAsync(final ConnectionWrapper 
connWrapper, final X val) throws StoreException
+    private <X> CompletableFuture<X> commitTranAsync(final ConnectionWrapper 
connWrapper, final X val) throws StoreException
     {
-        final SettableFuture<X> future = SettableFuture.create();
+        final CompletableFuture<X> future = new CompletableFuture<>();
         _executor.submit(() ->
         {
             try
             {
                 commitTran(connWrapper);
-                future.set(val);
+                future.complete(val);
             }
             catch (RuntimeException e)
             {
-                future.setException(e);
+                future.completeExceptionally(e);
             }
         });
         return future;
@@ -1274,11 +1274,11 @@ public abstract class AbstractJDBCMessageStore 
implements MessageStore
         }
 
         @Override
-        public <X> ListenableFuture<X> commitTranAsync(final X val)
+        public <X> CompletableFuture<X> commitTranAsync(final X val)
         {
             checkMessageStoreOpen();
             doPreCommitActions();
-            ListenableFuture<X> futureResult = 
AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper, val);
+            CompletableFuture<X> futureResult = 
AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper, val);
             storedSizeChange(_storeSizeIncrease);
             doPostCommitActions();
             return futureResult;
diff --git 
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
 
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
index cef4f79499..541e4375d6 100644
--- 
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
+++ 
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -18,13 +18,11 @@
  */
 package org.apache.qpid.server.store.jdbc;
 
-
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.Transaction;
@@ -137,7 +135,7 @@ public abstract class GenericAbstractJDBCMessageStore 
extends AbstractJDBCMessag
         }
 
         @Override
-        public <X> ListenableFuture<X> commitTranAsync(final X val)
+        public <X> CompletableFuture<X> commitTranAsync(final X val)
         {
             try
             {
diff --git 
a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
 
b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 4f359ab638..2c9e051e7d 100644
--- 
a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ 
b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -40,13 +40,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -148,7 +147,7 @@ public class JDBCMessageStoreTest extends 
MessageStoreTestCase
         assertEquals(transactionalLogId, record.getQueueId(), "Unexpected 
queue id");
         assertEquals(message.getMessageNumber(), record.getMessageNumber(), 
"Unexpected message number");
 
-        final ListenableFuture<Void> future = 
transaction.commitTranAsync(null);
+        final CompletableFuture<Void> future = 
transaction.commitTranAsync(null);
         future.get(1000, TimeUnit.MILLISECONDS);
     }
 
@@ -169,7 +168,7 @@ public class JDBCMessageStoreTest extends 
MessageStoreTestCase
         final Transaction dequeueTransaction = store.newTransaction();
         dequeueTransaction.dequeueMessage(record);
 
-        final ListenableFuture<Void> future = 
dequeueTransaction.commitTranAsync(null);
+        final CompletableFuture<Void> future = 
dequeueTransaction.commitTranAsync(null);
         future.get(1000, TimeUnit.MILLISECONDS);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org


Reply via email to