liangyepianzhou commented on code in PR #21406: URL: https://github.com/apache/pulsar/pull/21406#discussion_r1368029765
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java: ########## @@ -255,6 +230,40 @@ public long getCommittedTxnCount() { @Override public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { + buffer.retain(); + return transactionBufferFuture.thenCompose(ignore -> { + if (checkIfNoSnapshot()) { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { + if (changeToReadyStateFromNoSnapshot()) { + timer.newTimeout(TopicTransactionBuffer.this, + takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); + completableFuture.complete(null); + } else { + //This case should not happen. + log.error("[{} ]Failed to change state of transaction buffer to Ready from NoSnapshot", + topic.getName()); + completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( + "Transaction Buffer take first snapshot failed, the current state is: " + getState())); + } + }).exceptionally(exception -> { + log.error("Topic {} failed to take snapshot", this.topic.getName()); + completableFuture.completeExceptionally(exception); + return null; + }); + buffer.retain(); + return completableFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer)); + } else if (checkIfReady()) { + return internalAppendBufferToTxn(txnId, buffer); + } else { + // This case should not happen. + return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException( Review Comment: This logic is running in the thread of `transactionBufferFuture.thenCompose`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org