This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new 54bd735f88 QPID-8664: [Broker-J] Guava removal (1/10) 54bd735f88 is described below commit 54bd735f88db0ec36135a939dccb0216bd9dd317 Author: dakirily <daniel.kiril...@gmail.com> AuthorDate: Thu Mar 27 08:27:37 2025 +0100 QPID-8664: [Broker-J] Guava removal (1/10) This commit replaces guava ListenableFuture with JDK CompletableFuture in Broker-J transaction mechanisms --- .../store/berkeleydb/AbstractBDBMessageStore.java | 9 +++--- .../store/berkeleydb/CoalescingCommiter.java | 24 ++++++--------- .../qpid/server/store/berkeleydb/Committer.java | 6 ++-- .../server/store/berkeleydb/EnvironmentFacade.java | 6 ++-- .../berkeleydb/StandardEnvironmentFacade.java | 5 ++-- .../replication/ReplicatedEnvironmentFacade.java | 5 ++-- .../store/berkeleydb/CoalescingCommitterTest.java | 7 ++--- .../qpid/server/store/MemoryMessageStore.java | 5 ++-- .../org/apache/qpid/server/store/Transaction.java | 4 +-- .../server/txn/AsyncAutoCommitTransaction.java | 35 ++++++++++------------ .../org/apache/qpid/server/txn/AsyncCommand.java | 7 ++--- .../apache/qpid/server/txn/LocalTransaction.java | 4 +-- .../server/txn/AsyncAutoCommitTransactionTest.java | 6 ++-- .../qpid/server/txn/MockStoreTransaction.java | 8 ++--- .../qpid/server/protocol/v0_10/ServerSession.java | 4 +-- .../protocol/v0_10/ServerSessionDelegate.java | 6 ++-- .../qpid/server/protocol/v0_8/AMQChannel.java | 7 ++--- .../server/protocol/v1_0/SendingLinkEndpoint.java | 4 +-- .../v1_0/StandardReceivingLinkEndpoint.java | 4 +-- .../store/jdbc/AbstractJDBCMessageStore.java | 14 ++++----- .../jdbc/GenericAbstractJDBCMessageStore.java | 6 ++-- .../server/store/jdbc/JDBCMessageStoreTest.java | 7 ++--- 22 files changed, 86 insertions(+), 97 deletions(-) diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 1fca731aca..e5de3e0fb8 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -815,14 +816,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore } - private <X> ListenableFuture<X> commitTranAsyncImpl(final Transaction tx, X val) throws StoreException + private <X> CompletableFuture<X> commitTranAsyncImpl(final Transaction tx, X val) throws StoreException { if (tx == null) { throw new StoreException("Fatal internal error: transactional is null at commitTran"); } - ListenableFuture<X> result = getEnvironmentFacade().commitAsync(tx, val); + CompletableFuture<X> result = getEnvironmentFacade().commitAsync(tx, val); getLogger().debug("commitTranAsynImpl completed transaction {}", tx); @@ -1414,12 +1415,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public <X> ListenableFuture<X> commitTranAsync(final X val) throws StoreException + public <X> CompletableFuture<X> commitTranAsync(final X val) throws StoreException { checkMessageStoreOpen(); doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); - ListenableFuture<X> futureResult = AbstractBDBMessageStore.this.commitTranAsyncImpl(_txn, val); + CompletableFuture<X> futureResult = AbstractBDBMessageStore.this.commitTranAsyncImpl(_txn, val); doPostCommitActions(); return futureResult; } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index 151c01e159..9ab0d3958f 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.ArrayList; import java.util.List; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -82,7 +83,7 @@ public class CoalescingCommiter implements Committer } @Override - public <X> ListenableFuture<X> commitAsync(Transaction tx, X val) + public <X> CompletableFuture<X> commitAsync(Transaction tx, X val) { ThreadNotifyingSettableFuture<X> future = new ThreadNotifyingSettableFuture<>(); BDBCommitFutureResult<X> commitFuture = new BDBCommitFutureResult<>(val, future); @@ -106,13 +107,13 @@ public class CoalescingCommiter implements Committer @Override public void complete() { - _future.set(_value); + _future.complete(_value); } @Override public void abort(RuntimeException databaseException) { - _future.setException(databaseException); + _future.completeExceptionally(databaseException); } } @@ -295,7 +296,7 @@ public class CoalescingCommiter implements Committer } } - private final class ThreadNotifyingSettableFuture<X> extends AbstractFuture<X> + private final class ThreadNotifyingSettableFuture<X> extends CompletableFuture<X> { @Override public X get(final long timeout, final TimeUnit unit) @@ -319,22 +320,15 @@ public class CoalescingCommiter implements Committer } @Override - protected boolean set(final X value) + public boolean complete(final X value) { - return super.set(value); + return super.complete(value); } @Override - protected boolean setException(final Throwable throwable) + public boolean completeExceptionally(final Throwable throwable) { - return super.setException(throwable); - } - - @Override - public void addListener(final Runnable listener, final Executor exec) - { - super.addListener(listener, exec); - _commitThread.explicitNotify(); + return super.completeExceptionally(throwable); } } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java index 36620ee42e..1eb37b1735 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.server.store.berkeleydb; -import com.google.common.util.concurrent.ListenableFuture; +import java.util.concurrent.CompletableFuture; + import com.sleepycat.je.Transaction; public interface Committer @@ -28,7 +29,8 @@ public interface Committer void start(); void commit(Transaction tx, boolean syncCommit); - <X> ListenableFuture<X> commitAsync(Transaction tx, X val); + + <X> CompletableFuture<X> commitAsync(Transaction tx, X val); void stop(); } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index 5d3b8c1eca..4aa54b2b80 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Map; +import java.util.concurrent.CompletableFuture; -import com.google.common.util.concurrent.ListenableFuture; import com.sleepycat.je.CacheMode; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -62,11 +62,13 @@ public interface EnvironmentFacade Transaction beginTransaction(TransactionConfig transactionConfig); void commit(Transaction tx); - <X> ListenableFuture<X> commitAsync(Transaction tx, X val); + + <X> CompletableFuture<X> commitAsync(Transaction tx, X val); RuntimeException handleDatabaseException(String contextMessage, RuntimeException e); void closeDatabase(String name); + void close(); long getTotalLogSize(); diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index 59070a10bd..12969e78c5 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -27,12 +27,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.util.concurrent.ListenableFuture; - import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -207,7 +206,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override - public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X val) + public <X> CompletableFuture<X> commitAsync(final Transaction tx, final X val) { try { diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index d0193cea21..e08b0d2d01 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -36,6 +36,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -422,7 +423,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override - public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X val) + public <X> CompletableFuture<X> commitAsync(final Transaction tx, final X val) { commitInternal(tx, _realMessageStoreDurability); @@ -431,7 +432,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { return _coalescingCommiter.commitAsync(tx, val); } - return Futures.immediateFuture(val); + return CompletableFuture.completedFuture(val); } @Override diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java index a79cbbea5b..6b6c702abb 100644 --- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java +++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java @@ -30,11 +30,10 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import com.google.common.util.concurrent.ListenableFuture; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -100,7 +99,7 @@ public class CoalescingCommitterTest extends UnitTestBase try { - ListenableFuture<?> future = _coalescingCommitter.commitAsync(null, null); + CompletableFuture<?> future = _coalescingCommitter.commitAsync(null, null); future.get(1000, TimeUnit.MILLISECONDS); fail("Async commit should fail"); } @@ -113,7 +112,7 @@ public class CoalescingCommitterTest extends UnitTestBase doNothing().when(_environmentFacade).flushLog(); final String expectedResult = "Test"; - ListenableFuture<?> future = _coalescingCommitter.commitAsync(null, expectedResult); + CompletableFuture<?> future = _coalescingCommitter.commitAsync(null, expectedResult); Object result = future.get(1000, TimeUnit.MILLISECONDS); assertEquals(expectedResult, result, "Unexpected result"); diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index af00c11e24..bb13494c07 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -67,10 +68,10 @@ public class MemoryMessageStore implements MessageStore private final Set<Xid> _localDistributedTransactionsRemoves = new HashSet<>(); @Override - public <X> ListenableFuture<X> commitTranAsync(final X val) + public <X> CompletableFuture<X> commitTranAsync(final X val) { commitTran(); - return Futures.immediateFuture(val); + return CompletableFuture.completedFuture(val); } @Override diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java b/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java index 190357a1db..0264fa204c 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.store; -import com.google.common.util.concurrent.ListenableFuture; +import java.util.concurrent.CompletableFuture; import org.apache.qpid.server.message.EnqueueableMessage; @@ -49,7 +49,7 @@ public interface Transaction * * @param val */ - <X> ListenableFuture<X> commitTranAsync(final X val); + <X> CompletableFuture<X> commitTranAsync(final X val); /** * Abandons all operations performed within a given transactional context. diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 76eaef7617..5445c18b97 100755 --- a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -21,9 +21,8 @@ package org.apache.qpid.server.txn; import java.util.Collection; +import java.util.concurrent.CompletableFuture; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,8 +55,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public interface FutureRecorder { - void recordFuture(ListenableFuture<Void> future, Action action); - + void recordFuture(CompletableFuture<Void> future, Action action); } public AsyncAutoCommitTransaction(MessageStore transactionLog, FutureRecorder recorder) @@ -85,7 +83,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction @Override public void addPostTransactionAction(final Action immediateAction) { - addFuture(Futures.<Void>immediateFuture(null), immediateAction); + addFuture(CompletableFuture.completedFuture(null), immediateAction); } @@ -95,7 +93,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - ListenableFuture<Void> future; + CompletableFuture<Void> future; if(record != null) { LOGGER.debug("Dequeue of message number {} from transaction log. Queue : {}", record.getMessageNumber(), record.getQueueId()); @@ -108,7 +106,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = Futures.immediateFuture(null); + future = CompletableFuture.completedFuture(null); } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -120,8 +118,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - - private void addFuture(final ListenableFuture<Void> future, final Action action) + private void addFuture(final CompletableFuture<Void> future, final Action action) { if(action != null) { @@ -136,7 +133,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - private void addEnqueueFuture(final ListenableFuture<Void> future, final Action action, boolean persistent) + private void addEnqueueFuture(final CompletableFuture<Void> future, final Action action, boolean persistent) { if(action != null) { @@ -176,15 +173,15 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - ListenableFuture<Void> future; + CompletableFuture<Void> future; if(txn != null) { - future = txn.commitTranAsync((Void) null); + future = txn.commitTranAsync(null); txn = null; } else { - future = Futures.immediateFuture(null); + future = CompletableFuture.completedFuture(null); } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -203,7 +200,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - ListenableFuture<Void> future; + CompletableFuture<Void> future; final MessageEnqueueRecord enqueueRecord; if(queue.getMessageDurability().persist(message.isPersistent())) { @@ -216,7 +213,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = Futures.immediateFuture(null); + future = CompletableFuture.completedFuture(null); enqueueRecord = null; } final EnqueueAction underlying = postTransactionAction; @@ -287,15 +284,15 @@ public class AsyncAutoCommitTransaction implements ServerTransaction i++; } - ListenableFuture<Void> future; + CompletableFuture<Void> future; if (txn != null) { - future = txn.commitTranAsync((Void) null); + future = txn.commitTranAsync(null); txn = null; } else { - future = Futures.immediateFuture(null); + future = CompletableFuture.completedFuture(null); } final EnqueueAction underlying = postTransactionAction; addEnqueueFuture(future, new Action() @@ -350,7 +347,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { if(immediatePostTransactionAction != null) { - addFuture(Futures.<Void>immediateFuture(null), new Action() + addFuture(CompletableFuture.completedFuture(null), new Action() { @Override public void postCommit() diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java index 29ce605360..2ce84ada30 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java +++ b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java @@ -20,18 +20,17 @@ package org.apache.qpid.server.txn; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import com.google.common.util.concurrent.ListenableFuture; - import org.apache.qpid.server.util.ServerScopedRuntimeException; public class AsyncCommand { - private final ListenableFuture<Void> _future; + private final CompletableFuture<Void> _future; private ServerTransaction.Action _action; - public AsyncCommand(final ListenableFuture<Void> future, final ServerTransaction.Action action) + public AsyncCommand(final CompletableFuture<Void> future, final ServerTransaction.Action action) { _future = future; _action = action; diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 3fac630d7f..e9c5d973bb 100755 --- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -26,11 +26,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +74,7 @@ public class LocalTransaction implements ServerTransaction private final MessageStore _transactionLog; private volatile long _txnStartTime = 0L; private volatile long _txnUpdateTime = 0L; - private ListenableFuture<Runnable> _asyncTran; + private CompletableFuture<Runnable> _asyncTran; private volatile boolean _outstandingWork; private final LocalTransactionState _finalState; private final Set<LocalTransactionListener> _localTransactionListeners = new CopyOnWriteArraySet<>(); diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java index 93e46673bb..2318074273 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java @@ -26,8 +26,8 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import java.util.Collections; +import java.util.concurrent.CompletableFuture; -import com.google.common.util.concurrent.ListenableFuture; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -50,7 +50,7 @@ public class AsyncAutoCommitTransactionTest extends UnitTestBase private final BaseQueue _queue = mock(BaseQueue.class); private final MessageStore _messageStore = mock(MessageStore.class); private final ServerTransaction.EnqueueAction _postTransactionAction = mock(ServerTransaction.EnqueueAction.class); - private final ListenableFuture<Void> _future = mock(ListenableFuture.class); + private final CompletableFuture<Void> _future = mock(CompletableFuture.class); private Transaction _storeTransaction; private FutureRecorder _futureRecorder; @@ -149,7 +149,7 @@ public class AsyncAutoCommitTransactionTest extends UnitTestBase asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); verifyNoInteractions(_storeTransaction); - verify(_futureRecorder).recordFuture(any(ListenableFuture.class), any(Action.class)); + verify(_futureRecorder).recordFuture(any(CompletableFuture.class), any(Action.class)); verifyNoInteractions(_postTransactionAction); } } diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index fd21135b4e..0843f0cecc 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -21,9 +21,7 @@ package org.apache.qpid.server.txn; import java.util.UUID; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; +import java.util.concurrent.CompletableFuture; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.store.MessageEnqueueRecord; @@ -106,9 +104,9 @@ class MockStoreTransaction implements Transaction } @Override - public <X> ListenableFuture<X> commitTranAsync(final X val) + public <X> CompletableFuture<X> commitTranAsync(final X val) { - return Futures.immediateFuture(val); + return CompletableFuture.completedFuture(val); } @Override diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index e77738b5ca..c31510c25d 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentSkipListMap; @@ -62,7 +63,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; -import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1610,7 +1610,7 @@ public class ServerSession extends SessionInvoker } @Override - public void recordFuture(final ListenableFuture<Void> future, final ServerTransaction.Action action) + public void recordFuture(final CompletableFuture<Void> future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index ab264e85a2..f57ec2fd26 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -32,10 +32,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import com.google.common.util.concurrent.Futures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -150,7 +150,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme serverSession.accept(method.getTransfers()); if(!serverSession.isTransactional()) { - serverSession.recordFuture(Futures.<Void>immediateFuture(null), + serverSession.recordFuture(CompletableFuture.completedFuture(null), new CommandProcessedAction(serverSession, method)); } } @@ -537,7 +537,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme } else { - ssn.recordFuture(Futures.immediateFuture(null), + ssn.recordFuture(CompletableFuture.completedFuture(null), new CommandProcessedAction(ssn, xfr)); } } diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 4bb581413f..877ae574f9 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -46,8 +47,6 @@ import javax.security.auth.Subject; import com.google.common.base.Function; import com.google.common.collect.Collections2; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -459,7 +458,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 { if (_confirmOnPublish) { - recordFuture(Futures.immediateFuture(null), + recordFuture(CompletableFuture.completedFuture(null), new ServerTransaction.Action() { private final long _deliveryTag = _confirmedMessageCounter; @@ -1585,7 +1584,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 } @Override - public void recordFuture(final ListenableFuture<Void> future, final ServerTransaction.Action action) + public void recordFuture(final CompletableFuture<Void> future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java index 5081d14363..f5080a3bc6 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java @@ -29,11 +29,11 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.regex.Pattern; -import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -935,7 +935,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target> } @Override - public void recordFuture(final ListenableFuture<Void> future, final ServerTransaction.Action action) + public void recordFuture(final CompletableFuture<Void> future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java index 317f08c5c6..1ceb1dd7e1 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java @@ -32,9 +32,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; -import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -623,7 +623,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint } @Override - public void recordFuture(final ListenableFuture<Void> future, final ServerTransaction.Action action) + public void recordFuture(final CompletableFuture<Void> future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java index cc4b7ec079..060f3f3c98 100644 --- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java +++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; @@ -51,7 +52,6 @@ import java.util.stream.Collectors; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; @@ -929,19 +929,19 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } } - private <X> ListenableFuture<X> commitTranAsync(final ConnectionWrapper connWrapper, final X val) throws StoreException + private <X> CompletableFuture<X> commitTranAsync(final ConnectionWrapper connWrapper, final X val) throws StoreException { - final SettableFuture<X> future = SettableFuture.create(); + final CompletableFuture<X> future = new CompletableFuture<>(); _executor.submit(() -> { try { commitTran(connWrapper); - future.set(val); + future.complete(val); } catch (RuntimeException e) { - future.setException(e); + future.completeExceptionally(e); } }); return future; @@ -1274,11 +1274,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public <X> ListenableFuture<X> commitTranAsync(final X val) + public <X> CompletableFuture<X> commitTranAsync(final X val) { checkMessageStoreOpen(); doPreCommitActions(); - ListenableFuture<X> futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper, val); + CompletableFuture<X> futureResult = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper, val); storedSizeChange(_storeSizeIncrease); doPostCommitActions(); return futureResult; diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java index cef4f79499..541e4375d6 100644 --- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java +++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java @@ -18,13 +18,11 @@ */ package org.apache.qpid.server.store.jdbc; - import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.util.concurrent.ListenableFuture; - import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.Transaction; @@ -137,7 +135,7 @@ public abstract class GenericAbstractJDBCMessageStore extends AbstractJDBCMessag } @Override - public <X> ListenableFuture<X> commitTranAsync(final X val) + public <X> CompletableFuture<X> commitTranAsync(final X val) { try { diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java index 4f359ab638..2c9e051e7d 100644 --- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -40,13 +40,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.LongStream; -import com.google.common.util.concurrent.ListenableFuture; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -148,7 +147,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase assertEquals(transactionalLogId, record.getQueueId(), "Unexpected queue id"); assertEquals(message.getMessageNumber(), record.getMessageNumber(), "Unexpected message number"); - final ListenableFuture<Void> future = transaction.commitTranAsync(null); + final CompletableFuture<Void> future = transaction.commitTranAsync(null); future.get(1000, TimeUnit.MILLISECONDS); } @@ -169,7 +168,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase final Transaction dequeueTransaction = store.newTransaction(); dequeueTransaction.dequeueMessage(record); - final ListenableFuture<Void> future = dequeueTransaction.commitTranAsync(null); + final CompletableFuture<Void> future = dequeueTransaction.commitTranAsync(null); future.get(1000, TimeUnit.MILLISECONDS); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org