poorbarcode commented on code in PR #24945:
URL: https://github.com/apache/pulsar/pull/24945#discussion_r2510742548
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -267,47 +257,142 @@ public long getCommittedTxnCount() {
return this.txnCommittedCounter.sum();
}
+ private record PendingAppendingTxnBufferTask(TxnID txnId, long sequenceId,
ByteBuf buffer,
+ CompletableFuture<Position>
pendingPublishFuture) {}
+
@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long
sequenceId, ByteBuf buffer) {
- // Method `takeAbortedTxnsSnapshot` will be executed in the different
thread.
- // So we need to retain the buffer in this thread. It will be released
after message persistent.
- buffer.retain();
- CompletableFuture<Position> future =
getPublishFuture().thenCompose(ignore -> {
- if (checkIfNoSnapshot()) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- // `publishFuture` will be completed after message persistent,
so there will not be two threads
- // writing snapshots at the same time.
-
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(()
-> {
- if (changeToReadyStateFromNoSnapshot()) {
- timer.newTimeout(TopicTransactionBuffer.this,
- takeSnapshotIntervalTime,
TimeUnit.MILLISECONDS);
- completableFuture.complete(null);
- } else {
- log.error("[{}]Failed to change state of transaction
buffer to Ready from NoSnapshot",
- topic.getName());
- completableFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(
- "Transaction Buffer take first snapshot
failed, the current state is: " + getState()));
- }
- }).exceptionally(exception -> {
- log.error("Topic {} failed to take snapshot",
this.topic.getName());
- completableFuture.completeExceptionally(exception);
- return null;
- });
- return completableFuture.thenCompose(__ ->
internalAppendBufferToTxn(txnId, buffer));
- } else if (checkIfReady()) {
- return internalAppendBufferToTxn(txnId, buffer);
- } else {
- // `publishFuture` will be completed after transaction buffer
recover completely
- // during initializing, so this case should not happen.
+ synchronized (pendingAppendingTxnBufferTasks) {
+ // The first snapshot is in progress, the following publish tasks
will be pending.
+ if (!pendingAppendingTxnBufferTasks.isEmpty()) {
+ CompletableFuture<Position> res = new CompletableFuture<>();
+ buffer.retain();
+ pendingAppendingTxnBufferTasks.offer(new
PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res));
+ return res;
+ }
+
+ // `publishFuture` will be completed after transaction buffer
recover completely
+ // during initializing, so this case should not happen.
+ if (!checkIfReady() && !checkIfNoSnapshot() &&
!checkIfFirstSnapshotting() && !checkIfInitializing()) {
+ log.error("[{}] unexpected state: {} when try to take the
first transaction buffer snapshot",
+ topic.getName(), getState());
return FutureUtil.failedFuture(new
BrokerServiceException.ServiceUnitNotReadyException(
"Transaction Buffer recover failed, the current state
is: " + getState()));
}
- }).whenComplete(((position, throwable) -> buffer.release()));
- setPublishFuture(future);
- return future;
+
+ // The transaction buffer is ready to write.
+ if (checkIfReady()) {
+ return internalAppendBufferToTxn(txnId, buffer, sequenceId);
+ }
+
+ // Pending the current publishing and trigger new snapshot if
needed.
+ CompletableFuture<Position> res = new CompletableFuture<>();
+ buffer.retain();
+ pendingAppendingTxnBufferTasks.offer(new
PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res));
+
+ final java.util.function.Consumer<Throwable> failPendingTasks =
throwable -> {
+ synchronized (pendingAppendingTxnBufferTasks) {
+ PendingAppendingTxnBufferTask pendingTask = null;
+ while ((pendingTask =
pendingAppendingTxnBufferTasks.poll()) != null) {
+ pendingTask.buffer.release();
+
pendingTask.pendingPublishFuture.completeExceptionally(throwable);
+ }
+ }
+ };
+
+ final Runnable flushPendingTasks = () -> {
+ PendingAppendingTxnBufferTask pendingTask = null;
+ try {
+ synchronized (pendingAppendingTxnBufferTasks) {
+ while ((pendingTask =
pendingAppendingTxnBufferTasks.poll()) != null) {
+ final ByteBuf data = pendingTask.buffer;
+ final CompletableFuture<Position> pendingFuture =
+ pendingTask.pendingPublishFuture;
+ internalAppendBufferToTxn(pendingTask.txnId,
pendingTask.buffer,
+ pendingTask.sequenceId)
+ .whenComplete((positionAdded, ex3) -> {
+ data.release();
+ if (ex3 != null) {
+
pendingFuture.completeExceptionally(ex3);
+ return;
+ }
+ pendingFuture.complete(positionAdded);
+ });
+ }
+ }
+ } catch (Exception e) {
+ // If there are some error when adding entries or caching
entries, this log will be printed.
+ log.error("[{}] Failed to flush pending publishing
requests after taking the first"
+ + " snapshot.",
+ topic.getName(), e);
+ if (pendingTask != null) {
+ pendingTask.buffer.release();
+
pendingTask.pendingPublishFuture.completeExceptionally(e);
Review Comment:
Added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]