This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 54bd735f88 QPID-8664: [Broker-J] Guava removal (1/10)
54bd735f88 is described below
commit 54bd735f88db0ec36135a939dccb0216bd9dd317
Author: dakirily <[email protected]>
AuthorDate: Thu Mar 27 08:27:37 2025 +0100
QPID-8664: [Broker-J] Guava removal (1/10)
This commit replaces guava ListenableFuture with JDK CompletableFuture in
Broker-J transaction mechanisms
---
.../store/berkeleydb/AbstractBDBMessageStore.java | 9 +++---
.../store/berkeleydb/CoalescingCommiter.java | 24 ++++++---------
.../qpid/server/store/berkeleydb/Committer.java | 6 ++--
.../server/store/berkeleydb/EnvironmentFacade.java | 6 ++--
.../berkeleydb/StandardEnvironmentFacade.java | 5 ++--
.../replication/ReplicatedEnvironmentFacade.java | 5 ++--
.../store/berkeleydb/CoalescingCommitterTest.java | 7 ++---
.../qpid/server/store/MemoryMessageStore.java | 5 ++--
.../org/apache/qpid/server/store/Transaction.java | 4 +--
.../server/txn/AsyncAutoCommitTransaction.java | 35 ++++++++++------------
.../org/apache/qpid/server/txn/AsyncCommand.java | 7 ++---
.../apache/qpid/server/txn/LocalTransaction.java | 4 +--
.../server/txn/AsyncAutoCommitTransactionTest.java | 6 ++--
.../qpid/server/txn/MockStoreTransaction.java | 8 ++---
.../qpid/server/protocol/v0_10/ServerSession.java | 4 +--
.../protocol/v0_10/ServerSessionDelegate.java | 6 ++--
.../qpid/server/protocol/v0_8/AMQChannel.java | 7 ++---
.../server/protocol/v1_0/SendingLinkEndpoint.java | 4 +--
.../v1_0/StandardReceivingLinkEndpoint.java | 4 +--
.../store/jdbc/AbstractJDBCMessageStore.java | 14 ++++-----
.../jdbc/GenericAbstractJDBCMessageStore.java | 6 ++--
.../server/store/jdbc/JDBCMessageStoreTest.java | 7 ++---
22 files changed, 86 insertions(+), 97 deletions(-)
diff --git
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 1fca731aca..e5de3e0fb8 100644
---
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -815,14 +816,14 @@ public abstract class AbstractBDBMessageStore implements
MessageStore
}
- private <X> ListenableFuture<X> commitTranAsyncImpl(final Transaction tx,
X val) throws StoreException
+ private <X> CompletableFuture<X> commitTranAsyncImpl(final Transaction tx,
X val) throws StoreException
{
if (tx == null)
{
throw new StoreException("Fatal internal error: transactional is
null at commitTran");
}
- ListenableFuture<X> result = getEnvironmentFacade().commitAsync(tx,
val);
+ CompletableFuture<X> result = getEnvironmentFacade().commitAsync(tx,
val);
getLogger().debug("commitTranAsynImpl completed transaction {}", tx);
@@ -1414,12 +1415,12 @@ public abstract class AbstractBDBMessageStore
implements MessageStore
}
@Override
- public <X> ListenableFuture<X> commitTranAsync(final X val) throws
StoreException
+ public <X> CompletableFuture<X> commitTranAsync(final X val) throws
StoreException
{
checkMessageStoreOpen();
doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
- ListenableFuture<X> futureResult =
AbstractBDBMessageStore.this.commitTranAsyncImpl(_txn, val);
+ CompletableFuture<X> futureResult =
AbstractBDBMessageStore.this.commitTranAsyncImpl(_txn, val);
doPostCommitActions();
return futureResult;
}
diff --git
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index 151c01e159..9ab0d3958f 100644
---
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -82,7 +83,7 @@ public class CoalescingCommiter implements Committer
}
@Override
- public <X> ListenableFuture<X> commitAsync(Transaction tx, X val)
+ public <X> CompletableFuture<X> commitAsync(Transaction tx, X val)
{
ThreadNotifyingSettableFuture<X> future = new
ThreadNotifyingSettableFuture<>();
BDBCommitFutureResult<X> commitFuture = new
BDBCommitFutureResult<>(val, future);
@@ -106,13 +107,13 @@ public class CoalescingCommiter implements Committer
@Override
public void complete()
{
- _future.set(_value);
+ _future.complete(_value);
}
@Override
public void abort(RuntimeException databaseException)
{
- _future.setException(databaseException);
+ _future.completeExceptionally(databaseException);
}
}
@@ -295,7 +296,7 @@ public class CoalescingCommiter implements Committer
}
}
- private final class ThreadNotifyingSettableFuture<X> extends
AbstractFuture<X>
+ private final class ThreadNotifyingSettableFuture<X> extends
CompletableFuture<X>
{
@Override
public X get(final long timeout, final TimeUnit unit)
@@ -319,22 +320,15 @@ public class CoalescingCommiter implements Committer
}
@Override
- protected boolean set(final X value)
+ public boolean complete(final X value)
{
- return super.set(value);
+ return super.complete(value);
}
@Override
- protected boolean setException(final Throwable throwable)
+ public boolean completeExceptionally(final Throwable throwable)
{
- return super.setException(throwable);
- }
-
- @Override
- public void addListener(final Runnable listener, final Executor exec)
- {
- super.addListener(listener, exec);
- _commitThread.explicitNotify();
+ return super.completeExceptionally(throwable);
}
}
diff --git
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
index 36620ee42e..1eb37b1735 100644
---
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
+++
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.CompletableFuture;
+
import com.sleepycat.je.Transaction;
public interface Committer
@@ -28,7 +29,8 @@ public interface Committer
void start();
void commit(Transaction tx, boolean syncCommit);
- <X> ListenableFuture<X> commitAsync(Transaction tx, X val);
+
+ <X> CompletableFuture<X> commitAsync(Transaction tx, X val);
void stop();
}
diff --git
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index 5d3b8c1eca..4aa54b2b80 100644
---
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -21,8 +21,8 @@
package org.apache.qpid.server.store.berkeleydb;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
-import com.google.common.util.concurrent.ListenableFuture;
import com.sleepycat.je.CacheMode;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -62,11 +62,13 @@ public interface EnvironmentFacade
Transaction beginTransaction(TransactionConfig transactionConfig);
void commit(Transaction tx);
- <X> ListenableFuture<X> commitAsync(Transaction tx, X val);
+
+ <X> CompletableFuture<X> commitAsync(Transaction tx, X val);
RuntimeException handleDatabaseException(String contextMessage,
RuntimeException e);
void closeDatabase(String name);
+
void close();
long getTotalLogSize();
diff --git
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 59070a10bd..12969e78c5 100644
---
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -27,12 +27,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.util.concurrent.ListenableFuture;
-
import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -207,7 +206,7 @@ public class StandardEnvironmentFacade implements
EnvironmentFacade
}
@Override
- public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X
val)
+ public <X> CompletableFuture<X> commitAsync(final Transaction tx, final X
val)
{
try
{
diff --git
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index d0193cea21..e08b0d2d01 100644
---
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -36,6 +36,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -422,7 +423,7 @@ public class ReplicatedEnvironmentFacade implements
EnvironmentFacade, StateChan
}
@Override
- public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X
val)
+ public <X> CompletableFuture<X> commitAsync(final Transaction tx, final X
val)
{
commitInternal(tx, _realMessageStoreDurability);
@@ -431,7 +432,7 @@ public class ReplicatedEnvironmentFacade implements
EnvironmentFacade, StateChan
{
return _coalescingCommiter.commitAsync(tx, val);
}
- return Futures.immediateFuture(val);
+ return CompletableFuture.completedFuture(val);
}
@Override
diff --git
a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
index a79cbbea5b..6b6c702abb 100644
---
a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
+++
b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java
@@ -30,11 +30,10 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import com.google.common.util.concurrent.ListenableFuture;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -100,7 +99,7 @@ public class CoalescingCommitterTest extends UnitTestBase
try
{
- ListenableFuture<?> future =
_coalescingCommitter.commitAsync(null, null);
+ CompletableFuture<?> future =
_coalescingCommitter.commitAsync(null, null);
future.get(1000, TimeUnit.MILLISECONDS);
fail("Async commit should fail");
}
@@ -113,7 +112,7 @@ public class CoalescingCommitterTest extends UnitTestBase
doNothing().when(_environmentFacade).flushLog();
final String expectedResult = "Test";
- ListenableFuture<?> future = _coalescingCommitter.commitAsync(null,
expectedResult);
+ CompletableFuture<?> future = _coalescingCommitter.commitAsync(null,
expectedResult);
Object result = future.get(1000, TimeUnit.MILLISECONDS);
assertEquals(expectedResult, result, "Unexpected result");
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index af00c11e24..bb13494c07 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -67,10 +68,10 @@ public class MemoryMessageStore implements MessageStore
private final Set<Xid> _localDistributedTransactionsRemoves = new
HashSet<>();
@Override
- public <X> ListenableFuture<X> commitTranAsync(final X val)
+ public <X> CompletableFuture<X> commitTranAsync(final X val)
{
commitTran();
- return Futures.immediateFuture(val);
+ return CompletableFuture.completedFuture(val);
}
@Override
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
b/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
index 190357a1db..0264fa204c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.store;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.CompletableFuture;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -49,7 +49,7 @@ public interface Transaction
*
* @param val
*/
- <X> ListenableFuture<X> commitTranAsync(final X val);
+ <X> CompletableFuture<X> commitTranAsync(final X val);
/**
* Abandons all operations performed within a given transactional context.
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 76eaef7617..5445c18b97 100755
---
a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -21,9 +21,8 @@
package org.apache.qpid.server.txn;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,8 +55,7 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
public interface FutureRecorder
{
- void recordFuture(ListenableFuture<Void> future, Action action);
-
+ void recordFuture(CompletableFuture<Void> future, Action action);
}
public AsyncAutoCommitTransaction(MessageStore transactionLog,
FutureRecorder recorder)
@@ -85,7 +83,7 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
@Override
public void addPostTransactionAction(final Action immediateAction)
{
- addFuture(Futures.<Void>immediateFuture(null), immediateAction);
+ addFuture(CompletableFuture.completedFuture(null), immediateAction);
}
@@ -95,7 +93,7 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
Transaction txn = null;
try
{
- ListenableFuture<Void> future;
+ CompletableFuture<Void> future;
if(record != null)
{
LOGGER.debug("Dequeue of message number {} from transaction
log. Queue : {}", record.getMessageNumber(), record.getQueueId());
@@ -108,7 +106,7 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
}
else
{
- future = Futures.immediateFuture(null);
+ future = CompletableFuture.completedFuture(null);
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -120,8 +118,7 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
}
-
- private void addFuture(final ListenableFuture<Void> future, final Action
action)
+ private void addFuture(final CompletableFuture<Void> future, final Action
action)
{
if(action != null)
{
@@ -136,7 +133,7 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
}
}
- private void addEnqueueFuture(final ListenableFuture<Void> future, final
Action action, boolean persistent)
+ private void addEnqueueFuture(final CompletableFuture<Void> future, final
Action action, boolean persistent)
{
if(action != null)
{
@@ -176,15 +173,15 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
}
}
- ListenableFuture<Void> future;
+ CompletableFuture<Void> future;
if(txn != null)
{
- future = txn.commitTranAsync((Void) null);
+ future = txn.commitTranAsync(null);
txn = null;
}
else
{
- future = Futures.immediateFuture(null);
+ future = CompletableFuture.completedFuture(null);
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -203,7 +200,7 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
Transaction txn = null;
try
{
- ListenableFuture<Void> future;
+ CompletableFuture<Void> future;
final MessageEnqueueRecord enqueueRecord;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
@@ -216,7 +213,7 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
}
else
{
- future = Futures.immediateFuture(null);
+ future = CompletableFuture.completedFuture(null);
enqueueRecord = null;
}
final EnqueueAction underlying = postTransactionAction;
@@ -287,15 +284,15 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
i++;
}
- ListenableFuture<Void> future;
+ CompletableFuture<Void> future;
if (txn != null)
{
- future = txn.commitTranAsync((Void) null);
+ future = txn.commitTranAsync(null);
txn = null;
}
else
{
- future = Futures.immediateFuture(null);
+ future = CompletableFuture.completedFuture(null);
}
final EnqueueAction underlying = postTransactionAction;
addEnqueueFuture(future, new Action()
@@ -350,7 +347,7 @@ public class AsyncAutoCommitTransaction implements
ServerTransaction
{
if(immediatePostTransactionAction != null)
{
- addFuture(Futures.<Void>immediateFuture(null), new Action()
+ addFuture(CompletableFuture.completedFuture(null), new Action()
{
@Override
public void postCommit()
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java
b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java
index 29ce605360..2ce84ada30 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncCommand.java
@@ -20,18 +20,17 @@
package org.apache.qpid.server.txn;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import com.google.common.util.concurrent.ListenableFuture;
-
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class AsyncCommand
{
- private final ListenableFuture<Void> _future;
+ private final CompletableFuture<Void> _future;
private ServerTransaction.Action _action;
- public AsyncCommand(final ListenableFuture<Void> future, final
ServerTransaction.Action action)
+ public AsyncCommand(final CompletableFuture<Void> future, final
ServerTransaction.Action action)
{
_future = future;
_action = action;
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 3fac630d7f..e9c5d973bb 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -26,11 +26,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +74,7 @@ public class LocalTransaction implements ServerTransaction
private final MessageStore _transactionLog;
private volatile long _txnStartTime = 0L;
private volatile long _txnUpdateTime = 0L;
- private ListenableFuture<Runnable> _asyncTran;
+ private CompletableFuture<Runnable> _asyncTran;
private volatile boolean _outstandingWork;
private final LocalTransactionState _finalState;
private final Set<LocalTransactionListener> _localTransactionListeners =
new CopyOnWriteArraySet<>();
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
b/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
index 93e46673bb..2318074273 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
@@ -26,8 +26,8 @@ import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
-import com.google.common.util.concurrent.ListenableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -50,7 +50,7 @@ public class AsyncAutoCommitTransactionTest extends
UnitTestBase
private final BaseQueue _queue = mock(BaseQueue.class);
private final MessageStore _messageStore = mock(MessageStore.class);
private final ServerTransaction.EnqueueAction _postTransactionAction =
mock(ServerTransaction.EnqueueAction.class);
- private final ListenableFuture<Void> _future =
mock(ListenableFuture.class);
+ private final CompletableFuture<Void> _future =
mock(CompletableFuture.class);
private Transaction _storeTransaction;
private FutureRecorder _futureRecorder;
@@ -149,7 +149,7 @@ public class AsyncAutoCommitTransactionTest extends
UnitTestBase
asyncAutoCommitTransaction.enqueue(_queue, _message,
_postTransactionAction);
verifyNoInteractions(_storeTransaction);
- verify(_futureRecorder).recordFuture(any(ListenableFuture.class),
any(Action.class));
+ verify(_futureRecorder).recordFuture(any(CompletableFuture.class),
any(Action.class));
verifyNoInteractions(_postTransactionAction);
}
}
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
b/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index fd21135b4e..0843f0cecc 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -21,9 +21,7 @@
package org.apache.qpid.server.txn;
import java.util.UUID;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.CompletableFuture;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -106,9 +104,9 @@ class MockStoreTransaction implements Transaction
}
@Override
- public <X> ListenableFuture<X> commitTranAsync(final X val)
+ public <X> CompletableFuture<X> commitTranAsync(final X val)
{
- return Futures.immediateFuture(val);
+ return CompletableFuture.completedFuture(val);
}
@Override
diff --git
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index e77738b5ca..c31510c25d 100644
---
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -52,6 +52,7 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -62,7 +63,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
-import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1610,7 +1610,7 @@ public class ServerSession extends SessionInvoker
}
@Override
- public void recordFuture(final ListenableFuture<Void> future, final
ServerTransaction.Action action)
+ public void recordFuture(final CompletableFuture<Void> future, final
ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
diff --git
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index ab264e85a2..f57ec2fd26 100644
---
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -32,10 +32,10 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -150,7 +150,7 @@ public class ServerSessionDelegate extends
MethodDelegate<ServerSession> impleme
serverSession.accept(method.getTransfers());
if(!serverSession.isTransactional())
{
- serverSession.recordFuture(Futures.<Void>immediateFuture(null),
+ serverSession.recordFuture(CompletableFuture.completedFuture(null),
new
CommandProcessedAction(serverSession, method));
}
}
@@ -537,7 +537,7 @@ public class ServerSessionDelegate extends
MethodDelegate<ServerSession> impleme
}
else
{
- ssn.recordFuture(Futures.immediateFuture(null),
+
ssn.recordFuture(CompletableFuture.completedFuture(null),
new CommandProcessedAction(ssn, xfr));
}
}
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 4bb581413f..877ae574f9 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -46,8 +47,6 @@ import javax.security.auth.Subject;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -459,7 +458,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
{
if (_confirmOnPublish)
{
- recordFuture(Futures.immediateFuture(null),
+
recordFuture(CompletableFuture.completedFuture(null),
new ServerTransaction.Action()
{
private final long
_deliveryTag = _confirmedMessageCounter;
@@ -1585,7 +1584,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
@Override
- public void recordFuture(final ListenableFuture<Void> future, final
ServerTransaction.Action action)
+ public void recordFuture(final CompletableFuture<Void> future, final
ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 5081d14363..f5080a3bc6 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -29,11 +29,11 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.regex.Pattern;
-import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -935,7 +935,7 @@ public class SendingLinkEndpoint extends
AbstractLinkEndpoint<Source, Target>
}
@Override
- public void recordFuture(final ListenableFuture<Void> future, final
ServerTransaction.Action action)
+ public void recordFuture(final CompletableFuture<Void> future, final
ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 317f08c5c6..1ceb1dd7e1 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -32,9 +32,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
-import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -623,7 +623,7 @@ public class StandardReceivingLinkEndpoint extends
AbstractReceivingLinkEndpoint
}
@Override
- public void recordFuture(final ListenableFuture<Void> future, final
ServerTransaction.Action action)
+ public void recordFuture(final CompletableFuture<Void> future, final
ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
diff --git
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index cc4b7ec079..060f3f3c98 100644
---
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
@@ -51,7 +52,6 @@ import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -929,19 +929,19 @@ public abstract class AbstractJDBCMessageStore implements
MessageStore
}
}
- private <X> ListenableFuture<X> commitTranAsync(final ConnectionWrapper
connWrapper, final X val) throws StoreException
+ private <X> CompletableFuture<X> commitTranAsync(final ConnectionWrapper
connWrapper, final X val) throws StoreException
{
- final SettableFuture<X> future = SettableFuture.create();
+ final CompletableFuture<X> future = new CompletableFuture<>();
_executor.submit(() ->
{
try
{
commitTran(connWrapper);
- future.set(val);
+ future.complete(val);
}
catch (RuntimeException e)
{
- future.setException(e);
+ future.completeExceptionally(e);
}
});
return future;
@@ -1274,11 +1274,11 @@ public abstract class AbstractJDBCMessageStore
implements MessageStore
}
@Override
- public <X> ListenableFuture<X> commitTranAsync(final X val)
+ public <X> CompletableFuture<X> commitTranAsync(final X val)
{
checkMessageStoreOpen();
doPreCommitActions();
- ListenableFuture<X> futureResult =
AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper, val);
+ CompletableFuture<X> futureResult =
AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper, val);
storedSizeChange(_storeSizeIncrease);
doPostCommitActions();
return futureResult;
diff --git
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
index cef4f79499..541e4375d6 100644
---
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
+++
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -18,13 +18,11 @@
*/
package org.apache.qpid.server.store.jdbc;
-
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.util.concurrent.ListenableFuture;
-
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.Transaction;
@@ -137,7 +135,7 @@ public abstract class GenericAbstractJDBCMessageStore
extends AbstractJDBCMessag
}
@Override
- public <X> ListenableFuture<X> commitTranAsync(final X val)
+ public <X> CompletableFuture<X> commitTranAsync(final X val)
{
try
{
diff --git
a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 4f359ab638..2c9e051e7d 100644
---
a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++
b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -40,13 +40,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
-import com.google.common.util.concurrent.ListenableFuture;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -148,7 +147,7 @@ public class JDBCMessageStoreTest extends
MessageStoreTestCase
assertEquals(transactionalLogId, record.getQueueId(), "Unexpected
queue id");
assertEquals(message.getMessageNumber(), record.getMessageNumber(),
"Unexpected message number");
- final ListenableFuture<Void> future =
transaction.commitTranAsync(null);
+ final CompletableFuture<Void> future =
transaction.commitTranAsync(null);
future.get(1000, TimeUnit.MILLISECONDS);
}
@@ -169,7 +168,7 @@ public class JDBCMessageStoreTest extends
MessageStoreTestCase
final Transaction dequeueTransaction = store.newTransaction();
dequeueTransaction.dequeueMessage(record);
- final ListenableFuture<Void> future =
dequeueTransaction.commitTranAsync(null);
+ final CompletableFuture<Void> future =
dequeueTransaction.commitTranAsync(null);
future.get(1000, TimeUnit.MILLISECONDS);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]