This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 86ef0a03e66 [fix][broker] fix wrong method name checkTopicExists.
(#24293)
86ef0a03e66 is described below
commit 86ef0a03e663ea71f58a2bdee5593e3e8e1afe94
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue May 13 16:45:54 2025 +0800
[fix][broker] fix wrong method name checkTopicExists. (#24293)
The current method `checkTopicExists` is an asynchronous method but follows
a synchronous naming convention (lacking the `Async` suffix). This naming
inconsistency can mislead developers into assuming it's a blocking operation,
potentially causing misuse in client code. Since this method is `public`, we
cannot remove it directly without breaking backward compatibility.
1. **Introduce a new asynchronous method**:
- Added `checkTopicExistsAsync()` with the correct asynchronous naming
convention.
- Internally delegates to the original `checkTopicExists()` method to
retain existing logic.
2. **Deprecate the original method**:
- Marked `checkTopicExists()` as `@Deprecated` with a note directing
users to the new `checkTopicExistsAsync()`.
3. **Refactor internal usages**:
- Updated all internal calls to use `checkTopicExistsAsync()` instead of
the deprecated method.
4. **Documentation updates**:
- Added Javadoc to `checkTopicExists()` clarifying its deprecated status
and replacement.
This approach maintains backward compatibility while aligning method names
with their asynchronous behavior.
(cherry picked from commit af24849b84a0273228d192939976d23e12168541)
---
.../org/apache/pulsar/broker/admin/AdminResource.java | 2 +-
.../pulsar/broker/admin/impl/PersistentTopicsBase.java | 4 ++--
.../org/apache/pulsar/broker/lookup/TopicLookupBase.java | 2 +-
.../apache/pulsar/broker/namespace/NamespaceService.java | 15 ++++++++++++++-
.../org/apache/pulsar/broker/service/BrokerService.java | 2 +-
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../java/org/apache/pulsar/broker/admin/TopicsTest.java | 4 ++--
.../pulsar/broker/lookup/http/HttpTopicLookupv2Test.java | 6 +++---
.../OneWayReplicatorUsingGlobalPartitionedTest.java | 2 +-
.../broker/service/OneWayReplicatorUsingGlobalZKTest.java | 4 ++--
10 files changed, 28 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index fb7679ff269..841d37fff3a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -983,7 +983,7 @@ public abstract class AdminResource extends
PulsarWebResource {
}
protected CompletableFuture<Void> internalCheckTopicExists(TopicName
topicName) {
- return pulsar().getNamespaceService().checkTopicExists(topicName)
+ return pulsar().getNamespaceService().checkTopicExistsAsync(topicName)
.thenAccept(info -> {
boolean exists = info.isExists();
info.recycle();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 169a1b750c2..5e3e99020ae 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -740,7 +740,7 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalCheckTopicExists(TopicName
topicName) {
- return pulsar().getNamespaceService().checkTopicExists(topicName)
+ return pulsar().getNamespaceService().checkTopicExistsAsync(topicName)
.thenAccept(info -> {
boolean exists = info.isExists();
info.recycle();
@@ -5412,7 +5412,7 @@ public class PersistentTopicsBase extends AdminResource {
return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
"Only persistent topic can be set as shadow
topic"));
}
-
futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName)
+
futures.add(pulsar().getNamespaceService().checkTopicExistsAsync(shadowTopicName)
.thenAccept(info -> {
boolean exists = info.isExists();
info.recycle();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index e01ec83c860..92f045ad90d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -77,7 +77,7 @@ public class TopicLookupBase extends PulsarWebResource {
return CompletableFuture.completedFuture(true);
}
// Case-2: Persistent topic.
- return
pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(info -> {
+ return
pulsar().getNamespaceService().checkTopicExistsAsync(topicName).thenCompose(info
-> {
boolean exists = info.isExists();
info.recycle();
if (exists) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 032a868f2b3..9de84734d6a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1409,8 +1409,21 @@ public class NamespaceService implements AutoCloseable {
}
/***
- * Check topic exists( partitioned or non-partitioned ).
+ * Checks whether the topic exists( partitioned or non-partitioned ).
*/
+ public CompletableFuture<TopicExistsInfo> checkTopicExistsAsync(TopicName
topic) {
+ return checkTopicExists(topic);
+ }
+
+ /**
+ * Checks whether the topic exists( partitioned or non-partitioned ).
+ *
+ * @deprecated This method uses a misleading synchronous name for an
asynchronous operation.
+ * Use {@link #checkTopicExistsAsync(TopicName topic)} instead.
+ *
+ * @see #checkTopicExistsAsync(TopicName topic)
+ */
+ @Deprecated
public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName
topic) {
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index dd7c74888af..1d52d8599ce 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3229,7 +3229,7 @@ public class BrokerService implements Closeable {
if (pulsar.getNamespaceService() == null) {
return FutureUtil.failedFuture(new NamingException("namespace
service is not ready"));
}
- return
pulsar.getNamespaceService().checkTopicExists(topicName).thenComposeAsync(topicExistsInfo
-> {
+ return
pulsar.getNamespaceService().checkTopicExistsAsync(topicName).thenComposeAsync(topicExistsInfo
-> {
final boolean topicExists = topicExistsInfo.isExists();
final TopicType topicType = topicExistsInfo.getTopicType();
final Integer partitions = topicExistsInfo.getPartitions();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f9605449b1f..8bfcd9ca82b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -638,7 +638,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
&& brokerAllowAutoCreate;
if (!autoCreateIfNotExist) {
NamespaceService namespaceService =
getBrokerService().getPulsar().getNamespaceService();
-
namespaceService.checkTopicExists(topicName).thenAccept(topicExistsInfo -> {
+
namespaceService.checkTopicExistsAsync(topicName).thenAccept(topicExistsInfo ->
{
lookupSemaphore.release();
if (!topicExistsInfo.isExists()) {
writeAndFlush(Commands.newPartitionMetadataResponse(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 8940fe4a1f3..68d0bcbe152 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -363,7 +363,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest
{
CompletableFuture existFuture = new CompletableFuture();
existFuture.complete(TopicExistsInfo.newNonPartitionedTopicExists());
doReturn(future).when(nameSpaceService).getBrokerServiceUrlAsync(any(), any());
- doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
+
doReturn(existFuture).when(nameSpaceService).checkTopicExistsAsync(any());
CompletableFuture existBooleanFuture = new CompletableFuture();
existBooleanFuture.complete(false);
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
@@ -388,7 +388,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest
{
existFuture.complete(TopicExistsInfo.newTopicNotExists());
CompletableFuture existBooleanFuture = new CompletableFuture();
existBooleanFuture.complete(false);
- doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
+
doReturn(existFuture).when(nameSpaceService).checkTopicExistsAsync(any());
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
doReturn(nameSpaceService).when(pulsar).getNamespaceService();
AsyncResponse asyncResponse = mock(AsyncResponse.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index ab492de055b..9d0851c951f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -152,7 +152,7 @@ public class HttpTopicLookupv2Test {
NamespaceService namespaceService = pulsar.getNamespaceService();
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
future.complete(TopicExistsInfo.newTopicNotExists());
-
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
+
doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class));
CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
booleanFuture.complete(false);
doReturn(booleanFuture).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
@@ -266,7 +266,7 @@ public class HttpTopicLookupv2Test {
NamespaceService namespaceService = pulsar.getNamespaceService();
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
future.complete(TopicExistsInfo.newTopicNotExists());
-
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
+
doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class));
CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
booleanFuture.complete(false);
doReturn(future).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
@@ -303,7 +303,7 @@ public class HttpTopicLookupv2Test {
NamespaceService namespaceService = pulsar.getNamespaceService();
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
future.complete(TopicExistsInfo.newTopicNotExists());
-
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
+
doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class));
// Get the current semaphore first
Integer state1 =
pulsar.getBrokerService().getLookupRequestSemaphore().availablePermits();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 73fb2d82ea1..3be94854a4f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -220,7 +220,7 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
assertFalse(tps.containsKey(topicP1));
assertFalse(tps.containsKey(topicChangeEvents));
assertFalse(pulsar1.getNamespaceService()
- .checkTopicExists(TopicName.get(topicChangeEvents))
+ .checkTopicExistsAsync(TopicName.get(topicChangeEvents))
.get(5, TimeUnit.SECONDS).isExists());
// Verify: schema will be removed in local cluster, and remote
cluster will not.
List<CompletableFuture<StoredSchema>> schemaList13
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 7914ba5aebb..5cbea8df129 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -211,10 +211,10 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
Map<String, CompletableFuture<Optional<Topic>>> tps =
pulsar1.getBrokerService().getTopics();
assertFalse(tps.containsKey(topic));
assertFalse(tps.containsKey(topicChangeEvents));
-
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic))
+
assertFalse(pulsar1.getNamespaceService().checkTopicExistsAsync(TopicName.get(topic))
.get(5, TimeUnit.SECONDS).isExists());
assertFalse(pulsar1.getNamespaceService()
- .checkTopicExists(TopicName.get(topicChangeEvents))
+ .checkTopicExistsAsync(TopicName.get(topicChangeEvents))
.get(5, TimeUnit.SECONDS).isExists());
});