This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5583102aae1 [fix] [log] Do not print error log if tenant/namespace
does not exist when calling get topic metadata (#23291)
5583102aae1 is described below
commit 5583102aae135f5f62884f83e1ddd927b24ee737
Author: fengyubiao <[email protected]>
AuthorDate: Fri Sep 27 16:34:19 2024 +0800
[fix] [log] Do not print error log if tenant/namespace does not exist when
calling get topic metadata (#23291)
---
.../apache/pulsar/broker/service/ServerCnx.java | 41 ++++++++++++++---
.../broker/admin/GetPartitionMetadataTest.java | 51 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5b67b01115e..aedd68d416f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -63,6 +63,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -672,8 +674,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(),
requestId);
} else {
- log.warn("Failed to get
Partitioned Metadata [{}] {}: {}", remoteAddress,
- topicName,
ex.getMessage(), ex);
ServerError error =
ServerError.ServiceNotReady;
if (ex instanceof
MetadataStoreException) {
error =
ServerError.MetadataError;
@@ -685,6 +685,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
error =
ServerError.MetadataError;
}
}
+ if (error ==
ServerError.TopicNotFound) {
+ log.info("Trying to get
Partitioned Metadata for a resource not exist"
+ + "[{}] {}:
{}", remoteAddress,
+ topicName,
ex.getMessage());
+ } else {
+ log.warn("Failed to get
Partitioned Metadata [{}] {}: {}",
+ remoteAddress,
topicName, ex.getMessage(), ex);
+ }
commandSender.sendPartitionMetadataResponse(error, ex.getMessage(),
requestId);
}
@@ -702,6 +710,16 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "partition-metadata",
getPrincipal(), Optional.of(topicName), ex);
+ Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+ if (actEx instanceof WebApplicationException restException) {
+ if (restException.getResponse().getStatus() ==
Response.Status.NOT_FOUND.getStatusCode()) {
+
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+ "Tenant or namespace or topic does not exist: " +
topicName.getNamespace() ,
+ requestId));
+ lookupSemaphore.release();
+ return null;
+ }
+ }
final String msg = "Exception occurred while trying to
authorize get Partition Metadata";
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
msg,
requestId));
@@ -3663,13 +3681,22 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private static void logAuthException(SocketAddress remoteAddress, String
operation,
String principal, Optional<TopicName>
topic, Throwable ex) {
String topicString = topic.map(t -> ", topic=" +
t.toString()).orElse("");
- if (ex instanceof AuthenticationException) {
+ Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+ if (actEx instanceof AuthenticationException) {
log.info("[{}] Failed to authenticate: operation={},
principal={}{}, reason={}",
- remoteAddress, operation, principal, topicString,
ex.getMessage());
- } else {
- log.error("[{}] Error trying to authenticate: operation={},
principal={}{}",
- remoteAddress, operation, principal, topicString, ex);
+ remoteAddress, operation, principal, topicString,
actEx.getMessage());
+ return;
+ } else if (actEx instanceof WebApplicationException restException){
+ // Do not print error log if users tries to access a not found
resource.
+ if (restException.getResponse().getStatus() ==
Response.Status.NOT_FOUND.getStatusCode()) {
+ log.info("[{}] Trying to authenticate for a topic which under
a namespace not exists: operation={},"
+ + " principal={}{}, reason: {}",
+ remoteAddress, operation, principal, topicString,
actEx.getMessage());
+ return;
+ }
}
+ log.error("[{}] Error trying to authenticate: operation={},
principal={}{}",
+ remoteAddress, operation, principal, topicString, ex);
}
private static void logNamespaceNameAuthException(SocketAddress
remoteAddress, String operation,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
index 87bc4267b48..e9a639697d9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
@@ -578,4 +578,55 @@ public class GetPartitionMetadataTest {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
}
+
+ @Test(dataProvider = "topicDomains")
+ public void testNamespaceNotExist(TopicDomain topicDomain) throws
Exception {
+ int lookupPermitsBefore = getLookupRequestPermits();
+ final String namespaceNotExist =
BrokerTestUtil.newUniqueName("public/ns");
+ final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.toString() + "://" + namespaceNotExist
+ "/tp");
+ PulsarClientImpl[] clientArray = getClientsToTest(false);
+ for (PulsarClientImpl client : clientArray) {
+ try {
+ PartitionedTopicMetadata topicMetadata = client
+ .getPartitionedTopicMetadata(topicNameStr, true, true)
+ .join();
+ log.info("Get topic metadata: {}", topicMetadata.partitions);
+ fail("Expected a not found ex");
+ } catch (Exception ex) {
+ Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+ assertTrue(unwrapEx instanceof
PulsarClientException.BrokerMetadataException ||
+ unwrapEx instanceof
PulsarClientException.TopicDoesNotExistException);
+ }
+ }
+ // Verify: lookup semaphore has been releases.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+ });
+ }
+
+ @Test(dataProvider = "topicDomains")
+ public void testTenantNotExist(TopicDomain topicDomain) throws Exception {
+ int lookupPermitsBefore = getLookupRequestPermits();
+ final String tenantNotExist = BrokerTestUtil.newUniqueName("tenant");
+ final String namespaceNotExist =
BrokerTestUtil.newUniqueName(tenantNotExist + "/default");
+ final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.toString() + "://" + namespaceNotExist
+ "/tp");
+ PulsarClientImpl[] clientArray = getClientsToTest(false);
+ for (PulsarClientImpl client : clientArray) {
+ try {
+ PartitionedTopicMetadata topicMetadata = client
+ .getPartitionedTopicMetadata(topicNameStr, true, true)
+ .join();
+ log.info("Get topic metadata: {}", topicMetadata.partitions);
+ fail("Expected a not found ex");
+ } catch (Exception ex) {
+ Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+ assertTrue(unwrapEx instanceof
PulsarClientException.BrokerMetadataException ||
+ unwrapEx instanceof
PulsarClientException.TopicDoesNotExistException);
+ }
+ }
+ // Verify: lookup semaphore has been releases.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+ });
+ }
}