This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6b3fb41 Cleanup already deleted namespace topics. (#12597)
6b3fb41 is described below
commit 6b3fb4193857c324c748ea53ae5e6028137b2e35
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Nov 3 21:02:36 2021 +0800
Cleanup already deleted namespace topics. (#12597)
Cherry pick from #7473.
#7473 has fix the `Cleanup already deleted namespace topics` issue, but
with #8129 involved, changes have been
changed back.
### Motivation
We are having frequent issues when user removes cluster from the global
namespace where broker from removed-cluster fails to unload topic and namespace
bundle still loaded with the broker. It happens when broker from
removed-cluster receives below error
```
17:38:52.199 [pulsar-io-22-28] ERROR
org.apache.pulsar.broker.service.persistent.PersistentReplicator -
[persistent://prop/global/ns/tp1][east -> west] Failed to close dispatch rate
limiter: org.apache.pulsar.client.api.PulsarClientException: Producer was not
registered on the connection
:
17:38:52.199 [pulsar-io-22-28] WARN
org.apache.pulsar.broker.service.AbstractReplicator -
[persistent://prop/global/ns/tp1][east -> west]] Exception:
'org.apache.pulsar.client.api.PulsarClientException: Producer was not
registered on the connection' occured while trying to close the producer.
retrying again in 0.1 s
:
17:38:52.351 [pulsar-io-22-37] ERROR
org.apache.pulsar.broker.service.persistent.PersistentTopic -
[persistent://prop/global/ns/tp1] Error closing topic
java.util.concurrent.CompletionException:
org.apache.pulsar.client.api.PulsarClientException: Producer was not registered
on the connection
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
~[?:?]
```
### Modification
Source Broker should return explicit error-code when producer is already
closed and dest-broker from removed-cluster should handle this error and clean
up the replicator and topic gracefully.
---
.../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5be21cc..ba87469 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1579,9 +1579,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Producer> producerFuture = producers.get(producerId);
if (producerFuture == null) {
- log.warn("[{}] Producer was not registered on the connection.
producerId={}", remoteAddress, producerId);
- commandSender.sendErrorResponse(requestId,
ServerError.UnknownError,
- "Producer was not registered on the connection");
+ log.info("[{}] Producer {} was not registered on the connection",
remoteAddress, producerId);
+ ctx.writeAndFlush(Commands.newSuccess(requestId));
return;
}
@@ -1626,8 +1625,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture == null) {
- log.warn("[{}] Consumer was not registered on the connection:
consumerId={}", remoteAddress, consumerId);
- commandSender.sendErrorResponse(requestId,
ServerError.MetadataError, "Consumer not found");
+ log.info("[{}] Consumer was not registered on the connection: {}",
consumerId, remoteAddress);
+ ctx.writeAndFlush(Commands.newSuccess(requestId));
return;
}