This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 033a3b6e39b [optimize][txn] Optimize transaction lowWaterMark to clean
useless data faster (#15592)
033a3b6e39b is described below
commit 033a3b6e39bde79bf7eea146181805ca541f6a75
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat May 21 17:15:56 2022 +0800
[optimize][txn] Optimize transaction lowWaterMark to clean useless data
faster (#15592)
(cherry picked from commit fcf5e148b055d617db37eef3c40d4004e74190a5)
---
.../buffer/impl/TopicTransactionBuffer.java | 72 +++++------
.../pendingack/impl/PendingAckHandleImpl.java | 49 +++++--
.../buffer/TransactionLowWaterMarkTest.java | 142 ++++++++++++++++++++-
.../pendingack/PendingAckPersistentTest.java | 24 ++--
4 files changed, 223 insertions(+), 64 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 2ddac086294..7bb19d80086 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.SneakyThrows;
@@ -96,8 +97,13 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final CompletableFuture<Void> transactionBufferFuture = new
CompletableFuture<>();
+ /**
+ * The map is used to store the lowWaterMarks which key is TC ID and value
is lowWaterMark of the TC.
+ */
private final ConcurrentHashMap<Long, Long> lowWaterMarks = new
ConcurrentHashMap<>();
+ private final Semaphore handleLowWaterMark = new Semaphore(1);
+
public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None);
this.topic = topic;
@@ -285,13 +291,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
- lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark)
-> {
- if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
- return lowWaterMark;
- } else {
- return oldLowWaterMark;
- }
- });
if (log.isDebugEnabled()) {
log.debug("Transaction {} commit on topic {}.", txnID.toString(),
topic.getName());
}
@@ -332,13 +331,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
- lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark)
-> {
- if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
- return lowWaterMark;
- } else {
- return oldLowWaterMark;
- }
- });
if (log.isDebugEnabled()) {
log.debug("Transaction {} abort on topic {}.", txnID.toString(),
topic.getName());
}
@@ -358,12 +350,12 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
synchronized (TopicTransactionBuffer.this) {
aborts.put(txnID, (PositionImpl) position);
updateMaxReadPosition(txnID);
- handleLowWaterMark(txnID, lowWaterMark);
changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
clearAbortedTransactions();
takeSnapshotByChangeTimes();
}
completableFuture.complete(null);
+ handleLowWaterMark(txnID, lowWaterMark);
}
@Override
@@ -384,30 +376,36 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
- if (!ongoingTxns.isEmpty()) {
- TxnID firstTxn = ongoingTxns.firstKey();
- if (firstTxn.getMostSigBits() == txnID.getMostSigBits() &&
lowWaterMark >= firstTxn.getLeastSigBits()) {
- ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
- firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());
- try {
- topic.getManagedLedger().asyncAddEntry(abortMarker, new
AsyncCallbacks.AddEntryCallback() {
- @Override
- public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
- synchronized (TopicTransactionBuffer.this) {
- aborts.put(firstTxn, (PositionImpl) position);
- updateMaxReadPosition(firstTxn);
- }
- }
-
- @Override
- public void addFailed(ManagedLedgerException
exception, Object ctx) {
- log.error("Failed to abort low water mark for txn
{}", txnID, exception);
- }
- }, null);
- } finally {
- abortMarker.release();
+ lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark)
-> {
+ if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+ return lowWaterMark;
+ } else {
+ return oldLowWaterMark;
+ }
+ });
+ if (handleLowWaterMark.tryAcquire()) {
+ if (!ongoingTxns.isEmpty()) {
+ TxnID firstTxn = ongoingTxns.firstKey();
+ long tCId = firstTxn.getMostSigBits();
+ Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+ if (lowWaterMarkOfFirstTxnId != null &&
firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+ abortTxn(firstTxn, lowWaterMarkOfFirstTxnId)
+ .thenRun(() -> {
+ log.warn("Successes to abort low water mark
for txn [{}], topic [{}],"
+ + " lowWaterMark [{}]", firstTxn,
topic.getName(), lowWaterMarkOfFirstTxnId);
+ handleLowWaterMark.release();
+ })
+ .exceptionally(ex -> {
+ log.warn("Failed to abort low water mark for
txn {}, topic [{}], "
+ + "lowWaterMark [{}], ", firstTxn,
topic.getName(), lowWaterMarkOfFirstTxnId,
+ ex);
+ handleLowWaterMark.release();
+ return null;
+ });
+ return;
}
}
+ handleLowWaterMark.release();
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index ab5a8ff1454..4fdc298f303 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -28,10 +28,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -113,6 +115,13 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
private final BlockingQueue<Runnable> acceptQueue = new
LinkedBlockingDeque<>();
+ /**
+ * The map is used to store the lowWaterMarks which key is TC ID and value
is lowWaterMark of the TC.
+ */
+ private final ConcurrentHashMap<Long, Long> lowWaterMarks = new
ConcurrentHashMap<>();
+
+ private final Semaphore handleLowWaterMark = new Semaphore(1);
+
@Getter
private final ExecutorService internalPinnedExecutor;
@@ -593,20 +602,34 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
}
private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
- if (individualAckOfTransaction != null &&
!individualAckOfTransaction.isEmpty()) {
- TxnID firstTxn = individualAckOfTransaction.firstKey();
-
- if (firstTxn.getMostSigBits() == txnID.getMostSigBits()
- && firstTxn.getLeastSigBits() <= lowWaterMark) {
- abortTxn(firstTxn, null, lowWaterMark).thenRun(() -> {
- log.warn("[{}] Transaction pending ack handle low water
mark success! txnId : [{}], "
- + "lowWaterMark : [{}]", topicName, txnID,
lowWaterMark);
- }).exceptionally(e -> {
- log.warn("[{}] Transaction pending ack handle low water
mark fail! txnId : [{}], "
- + "lowWaterMark : [{}]", topicName, txnID,
lowWaterMark);
- return null;
- });
+ lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark)
-> {
+ if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+ return lowWaterMark;
+ } else {
+ return oldLowWaterMark;
+ }
+ });
+
+ if (handleLowWaterMark.tryAcquire()) {
+ if (individualAckOfTransaction != null &&
!individualAckOfTransaction.isEmpty()) {
+ TxnID firstTxn = individualAckOfTransaction.firstKey();
+ long tCId = firstTxn.getMostSigBits();
+ Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+ if (lowWaterMarkOfFirstTxnId != null &&
firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+ abortTxn(firstTxn, null,
lowWaterMarkOfFirstTxnId).thenRun(() -> {
+ log.warn("[{}] Transaction pending ack handle low
water mark success! txnId : [{}], "
+ + "lowWaterMark : [{}]", topicName, firstTxn,
lowWaterMarkOfFirstTxnId);
+ handleLowWaterMark.release();
+ }).exceptionally(ex -> {
+ log.warn("[{}] Transaction pending ack handle low
water mark fail! txnId : [{}], "
+ + "lowWaterMark : [{}]", topicName, firstTxn,
lowWaterMarkOfFirstTxnId);
+ handleLowWaterMark.release();
+ return null;
+ });
+ return;
+ }
}
+ handleLowWaterMark.release();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index ba0659892b4..1f012d7a6b1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -31,13 +31,14 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -52,7 +53,6 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -71,7 +71,7 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class TransactionLowWaterMarkTest extends TransactionTestBase {
- private static final String TOPIC = NAMESPACE1 + "/test-topic";
+ private static final String TOPIC = "persistent://" + NAMESPACE1 +
"/test-topic";
@BeforeMethod(alwaysRun = true)
protected void setup() throws Exception {
@@ -216,7 +216,7 @@ public class TransactionLowWaterMarkTest extends
TransactionTestBase {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
topics =
(ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>>) field
.get(getPulsarServiceList().get(i).getBrokerService());
- CompletableFuture<Optional<Topic>> completableFuture =
topics.get("persistent://" + TOPIC);
+ CompletableFuture<Optional<Topic>> completableFuture =
topics.get(TOPIC);
if (completableFuture != null) {
Optional<Topic> topic = completableFuture.get();
if (topic.isPresent()) {
@@ -327,4 +327,138 @@ public class TransactionLowWaterMarkTest extends
TransactionTestBase {
// no-op
}
}
+
+ @Test
+ public void testLowWaterMarkForDifferentTC() throws Exception {
+ String subName = "sub";
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(TOPIC)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(TOPIC)
+ .subscriptionName(subName)
+ .subscribe();
+
+ Transaction txn1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ Transaction txn2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ while (txn2.getTxnID().getMostSigBits() ==
txn1.getTxnID().getMostSigBits()) {
+ txn2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ }
+ Transaction txn3 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ while (txn3.getTxnID().getMostSigBits() !=
txn2.getTxnID().getMostSigBits()) {
+ txn3 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ }
+
+ Transaction txn4 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ while (txn4.getTxnID().getMostSigBits() !=
txn1.getTxnID().getMostSigBits()) {
+ txn4 = pulsarClient.newTransaction()
+ .withTransactionTimeout(500, TimeUnit.SECONDS)
+ .build().get();
+ }
+
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().send();
+ }
+
+ producer.newMessage(txn1).send();
+ producer.newMessage(txn2).send();
+ producer.newMessage(txn3).send();
+ producer.newMessage(txn4).send();
+
+ Message<byte[]> message1 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message1.getMessageId(), txn1);
+ Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message2.getMessageId(), txn2);
+ Message<byte[]> message3 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message3.getMessageId(), txn3);
+ Message<byte[]> message4 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message4.getMessageId(), txn4);
+
+ txn1.commit().get();
+ txn2.commit().get();
+
+ Field field = TransactionImpl.class.getDeclaredField("state");
+ field.setAccessible(true);
+ field.set(txn1, TransactionImpl.State.OPEN);
+ field.set(txn2, TransactionImpl.State.OPEN);
+
+ producer.newMessage(txn1).send();
+ producer.newMessage(txn2).send();
+
+ Message<byte[]> message5 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message5.getMessageId(), txn1);
+ Message<byte[]> message6 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message6.getMessageId(), txn2);
+
+ txn3.commit().get();
+ TxnID txnID1 = txn1.getTxnID();
+ TxnID txnID2 = txn2.getTxnID();
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(checkTxnIsOngoingInTP(txnID1, subName));
+ assertTrue(checkTxnIsOngoingInTP(txnID2, subName));
+ assertTrue(checkTxnIsOngoingInTB(txnID1));
+ assertTrue(checkTxnIsOngoingInTB(txnID2));
+ });
+
+ txn4.commit().get();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertFalse(checkTxnIsOngoingInTP(txnID1, subName));
+ assertFalse(checkTxnIsOngoingInTP(txnID2, subName));
+ assertFalse(checkTxnIsOngoingInTB(txnID1));
+ assertFalse(checkTxnIsOngoingInTB(txnID2));
+ });
+ }
+
+ private boolean checkTxnIsOngoingInTP(TxnID txnID, String subName) throws
Exception {
+ PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0)
+ .getBrokerService()
+ .getTopic(TopicName.get(TOPIC).toString(), false)
+ .get().get();
+
+ PersistentSubscription persistentSubscription =
persistentTopic.getSubscription(subName);
+
+ Field field1 =
PersistentSubscription.class.getDeclaredField("pendingAckHandle");
+ field1.setAccessible(true);
+ PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl)
field1.get(persistentSubscription);
+
+ Field field2 =
PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+ field2.setAccessible(true);
+ LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>
individualAckOfTransaction =
+ (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>)
field2.get(pendingAckHandle);
+ return individualAckOfTransaction.containsKey(txnID);
+ }
+
+ private boolean checkTxnIsOngoingInTB(TxnID txnID) throws Exception {
+ PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0)
+ .getBrokerService()
+ .getTopic(TopicName.get(TOPIC).toString(), false)
+ .get().get();
+
+ TopicTransactionBuffer topicTransactionBuffer =
+ (TopicTransactionBuffer)
persistentTopic.getTransactionBuffer();
+ Field field3 =
TopicTransactionBuffer.class.getDeclaredField("ongoingTxns");
+ field3.setAccessible(true);
+ LinkedMap<TxnID, PositionImpl> ongoingTxns =
+ (LinkedMap<TxnID, PositionImpl>)
field3.get(topicTransactionBuffer);
+ return ongoingTxns.containsKey(txnID);
+
+ }
+
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index a64707f1ae8..6683be138da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.transaction.pendingack;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -45,7 +44,6 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
@@ -55,8 +53,6 @@ import
org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -545,24 +541,32 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
.get();
PersistentSubscription persistentSubscription =
persistentTopic.getSubscription(subName);
+ Field field1 =
PersistentSubscription.class.getDeclaredField("pendingAckHandle");
+ field1.setAccessible(true);
+ PendingAckHandleImpl oldPendingAckHandle = (PendingAckHandleImpl)
field1.get(persistentSubscription);
+ Field field2 =
PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+ field2.setAccessible(true);
+ LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>
oldIndividualAckOfTransaction =
+ (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>)
field2.get(oldPendingAckHandle);
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(oldIndividualAckOfTransaction.size(), 0));
+
PendingAckHandleImpl pendingAckHandle = new
PendingAckHandleImpl(persistentSubscription);
Method method =
PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore");
method.setAccessible(true);
method.invoke(pendingAckHandle);
- Field field1 =
PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
- field1.setAccessible(true);
- CompletableFuture<PendingAckStore> completableFuture =
- (CompletableFuture<PendingAckStore>)
field1.get(pendingAckHandle);
+ Field field3 =
PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
+ field3.setAccessible(true);
Awaitility.await().until(() -> {
+ CompletableFuture<PendingAckStore> completableFuture =
+ (CompletableFuture<PendingAckStore>)
field3.get(pendingAckHandle);
completableFuture.get();
return true;
});
- Field field2 =
PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
- field2.setAccessible(true);
+
LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>
individualAckOfTransaction =
(LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>)
field2.get(pendingAckHandle);