BewareMyPower commented on code in PR #23884:
URL: https://github.com/apache/pulsar/pull/23884#discussion_r1927292797


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -54,10 +56,20 @@ public 
SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
                 .getTransactionBufferSnapshotServiceFactory()
                 
.getTxnBufferSnapshotService().getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
         this.takeSnapshotWriter.getFuture().exceptionally((ex) -> {
-                    log.error("{} Failed to create snapshot writer", 
topic.getName());
-                    topic.close();
-                    return null;
-                });
+            log.error("{} Failed to create snapshot writer", topic.getName());
+            // Don't directly use the topic object to close, because the 
topicFuture might not
+            // be completed at that time, which could leave closed topics in 
the cache(at BrokerService).
+            CompletableFuture<Optional<Topic>> topicFuture = 
topic.getBrokerService().getTopics().get(topic.getName());
+            if (topicFuture != null) {
+                topicFuture.thenAccept(t -> t.ifPresent(v -> {
+                    v.close(true).exceptionally(ec -> {
+                        log.error("Close topic {} exception", v.getName(), ec);

Review Comment:
   If `close` failed, the topic will still be cached. Should you call 
`BrokerService#removeTopicFromCache` in this case?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -54,10 +56,20 @@ public 
SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
                 .getTransactionBufferSnapshotServiceFactory()
                 
.getTxnBufferSnapshotService().getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
         this.takeSnapshotWriter.getFuture().exceptionally((ex) -> {
-                    log.error("{} Failed to create snapshot writer", 
topic.getName());
-                    topic.close();
-                    return null;
-                });
+            log.error("{} Failed to create snapshot writer", topic.getName());

Review Comment:
   It would be better to add `ex` to the log message here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -54,10 +56,20 @@ public 
SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
                 .getTransactionBufferSnapshotServiceFactory()
                 
.getTxnBufferSnapshotService().getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject());
         this.takeSnapshotWriter.getFuture().exceptionally((ex) -> {
-                    log.error("{} Failed to create snapshot writer", 
topic.getName());
-                    topic.close();
-                    return null;
-                });
+            log.error("{} Failed to create snapshot writer", topic.getName());
+            // Don't directly use the topic object to close, because the 
topicFuture might not
+            // be completed at that time, which could leave closed topics in 
the cache(at BrokerService).
+            CompletableFuture<Optional<Topic>> topicFuture = 
topic.getBrokerService().getTopics().get(topic.getName());
+            if (topicFuture != null) {
+                topicFuture.thenAccept(t -> t.ifPresent(v -> {

Review Comment:
   I'm not sure why the value of `topics` is an `Optional`. I just have a 
question that when could the value be `Optional.empty()`? If so, an empty 
`Optional` will be cached and never removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to