This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b3fb41  Cleanup already deleted namespace topics. (#12597)
6b3fb41 is described below

commit 6b3fb4193857c324c748ea53ae5e6028137b2e35
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Nov 3 21:02:36 2021 +0800

    Cleanup already deleted namespace topics. (#12597)
    
    Cherry pick from #7473.
    
    #7473 has fix the `Cleanup already deleted namespace topics` issue, but 
with #8129 involved, changes have been
    changed back.
    
    ### Motivation
    We are having frequent issues when user removes cluster from the global 
namespace where broker from removed-cluster fails to unload topic and namespace 
bundle still loaded with the broker. It happens when broker from 
removed-cluster receives below error
    ```
    17:38:52.199 [pulsar-io-22-28] ERROR 
org.apache.pulsar.broker.service.persistent.PersistentReplicator - 
[persistent://prop/global/ns/tp1][east -> west] Failed to close dispatch rate 
limiter: org.apache.pulsar.client.api.PulsarClientException: Producer was not 
registered on the connection
    :
    17:38:52.199 [pulsar-io-22-28] WARN  
org.apache.pulsar.broker.service.AbstractReplicator - 
[persistent://prop/global/ns/tp1][east -> west]] Exception: 
'org.apache.pulsar.client.api.PulsarClientException: Producer was not 
registered on the connection' occured while trying to close the producer. 
retrying again in 0.1 s
    :
    17:38:52.351 [pulsar-io-22-37] ERROR 
org.apache.pulsar.broker.service.persistent.PersistentTopic - 
[persistent://prop/global/ns/tp1] Error closing topic
    java.util.concurrent.CompletionException: 
org.apache.pulsar.client.api.PulsarClientException: Producer was not registered 
on the connection
            at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
 ~[?:?]
    ```
    
    ### Modification
    Source Broker should return explicit error-code when producer is already 
closed and dest-broker from removed-cluster should handle this error and clean 
up the replicator and topic gracefully.
---
 .../main/java/org/apache/pulsar/broker/service/ServerCnx.java    | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5be21cc..ba87469 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1579,9 +1579,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         CompletableFuture<Producer> producerFuture = producers.get(producerId);
         if (producerFuture == null) {
-            log.warn("[{}] Producer was not registered on the connection. 
producerId={}", remoteAddress, producerId);
-            commandSender.sendErrorResponse(requestId, 
ServerError.UnknownError,
-                    "Producer was not registered on the connection");
+            log.info("[{}] Producer {} was not registered on the connection", 
remoteAddress, producerId);
+            ctx.writeAndFlush(Commands.newSuccess(requestId));
             return;
         }
 
@@ -1626,8 +1625,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
         if (consumerFuture == null) {
-            log.warn("[{}] Consumer was not registered on the connection: 
consumerId={}", remoteAddress, consumerId);
-            commandSender.sendErrorResponse(requestId, 
ServerError.MetadataError, "Consumer not found");
+            log.info("[{}] Consumer was not registered on the connection: {}", 
consumerId, remoteAddress);
+            ctx.writeAndFlush(Commands.newSuccess(requestId));
             return;
         }
 

Reply via email to