poorbarcode commented on code in PR #24945: URL: https://github.com/apache/pulsar/pull/24945#discussion_r2508519533
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PendingAppendingTxnBufferTask.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.netty.buffer.ByteBuf; +import java.util.concurrent.CompletableFuture; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.client.api.transaction.TxnID; + +@Getter +@AllArgsConstructor +public class PendingAppendingTxnBufferTask { + + private final TxnID txnId; + private final long sequenceId; + private final ByteBuf buffer; + private CompletableFuture<Position> pendingPublishFuture; +} Review Comment: Changed it to a record ########## pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java: ########## @@ -34,4 +41,12 @@ public TransactionBufferTestImpl(PersistentTopic topic) { public State getState() { return state == null ? super.getState() : state; } + + @Override + protected CompletableFuture<Position> internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer, long seq) { + if (followingInternalAppendBufferToTxnFail) { + return CompletableFuture.failedFuture(new RuntimeException("fail")); Review Comment: Added ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java: ########## @@ -269,45 +260,138 @@ public long getCommittedTxnCount() { @Override public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { - // Method `takeAbortedTxnsSnapshot` will be executed in the different thread. - // So we need to retain the buffer in this thread. It will be released after message persistent. - buffer.retain(); - CompletableFuture<Position> future = getPublishFuture().thenCompose(ignore -> { - if (checkIfNoSnapshot()) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - // `publishFuture` will be completed after message persistent, so there will not be two threads - // writing snapshots at the same time. - snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { - if (changeToReadyStateFromNoSnapshot()) { - timer.newTimeout(TopicTransactionBuffer.this, - takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); - completableFuture.complete(null); - } else { - log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot", - topic.getName()); - completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( - "Transaction Buffer take first snapshot failed, the current state is: " + getState())); - } - }).exceptionally(exception -> { - log.error("Topic {} failed to take snapshot", this.topic.getName()); - completableFuture.completeExceptionally(exception); - return null; - }); - return completableFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer)); - } else if (checkIfReady()) { - return internalAppendBufferToTxn(txnId, buffer); - } else { - // `publishFuture` will be completed after transaction buffer recover completely - // during initializing, so this case should not happen. + synchronized (pendingAppendingTxnBufferTasks) { + // The first snapshot is in progress, the following publish tasks will be pending. + if (!pendingAppendingTxnBufferTasks.isEmpty()) { + CompletableFuture<Position> res = new CompletableFuture<>(); + buffer.retain(); + pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res)); + return res; + } + + // `publishFuture` will be completed after transaction buffer recover completely + // during initializing, so this case should not happen. + if (!checkIfReady() && !checkIfNoSnapshot() && !checkIfFirstSnapshotting() && !checkIfInitializing()) { + log.error("[{}] unexpected state: {} when try to take the first transaction buffer snapshot", + topic.getName(), getState()); return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException( "Transaction Buffer recover failed, the current state is: " + getState())); } - }).whenComplete(((position, throwable) -> buffer.release())); - setPublishFuture(future); - return future; + + // The transaction buffer is ready to write. + if (checkIfReady()) { + return internalAppendBufferToTxn(txnId, buffer, sequenceId); + } + + // Pending the current publishing and trigger new snapshot if needed. + CompletableFuture<Position> res = new CompletableFuture<>(); + buffer.retain(); + pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res)); + + final java.util.function.Consumer<Throwable> failPendingTasks = throwable -> { + synchronized (pendingAppendingTxnBufferTasks) { + PendingAppendingTxnBufferTask pendingTask = null; + while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { + pendingTask.getBuffer().release(); + pendingTask.getPendingPublishFuture().completeExceptionally(throwable); + } + } + }; + + final Runnable flushPendingTasks = () -> { + PendingAppendingTxnBufferTask pendingTask = null; + try { + synchronized (pendingAppendingTxnBufferTasks) { + while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { + final ByteBuf data = pendingTask.getBuffer(); + final CompletableFuture<Position> pendingFuture = + pendingTask.getPendingPublishFuture(); + internalAppendBufferToTxn(pendingTask.getTxnId(), pendingTask.getBuffer(), + pendingTask.getSequenceId()) + .whenComplete((positionAdded, ex3) -> { + if (ex3 != null) { + data.release(); + pendingFuture.completeExceptionally(ex3); + return; + } + pendingFuture.complete(positionAdded); + data.release(); Review Comment: Moved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
