lhotari commented on code in PR #23884:
URL: https://github.com/apache/pulsar/pull/23884#discussion_r1927152617
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -54,10 +56,20 @@ public
SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
.getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotService().getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
this.takeSnapshotWriter.getFuture().exceptionally((ex) -> {
- log.error("{} Failed to create snapshot writer",
topic.getName());
- topic.close();
- return null;
- });
+ log.error("{} Failed to create snapshot writer", topic.getName());
+ // Don't directly use the topic object to close, because the
topicFuture might not
+ // be completed at that time, which could leave closed topics in
the cache(at BrokerService).
+ CompletableFuture<Optional<Topic>> topicFuture =
topic.getBrokerService().getTopics().get(topic.getName());
+ if (topicFuture != null) {
+ topicFuture.thenAccept(t -> t.ifPresent(v -> {
+ v.close(true).exceptionally(ec -> {
+ log.error("Close topic {} exception", v.getName(), ec);
+ return null;
+ });
+ }));
+ }
Review Comment:
This code is now duplicated in multiple locations. Instead of adding code
duplication,
I'd suggest to create a new public method directly in BrokerService which
called `closeTopicForcefullyIfExists`. The javadoc should describe the purpose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]