This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b97483a198 [feat] PIP-468: segment-aware admin endpoints for cursor 
lifecycle (#25717)
5b97483a198 is described below

commit 5b97483a1981fa3cdd8748a6db022a6ef31a44b1
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 14:41:37 2026 -0700

    [feat] PIP-468: segment-aware admin endpoints for cursor lifecycle (#25717)
---
 .../pulsar/broker/admin/v2/ScalableTopics.java     |  24 ++---
 .../apache/pulsar/broker/admin/v2/Segments.java    | 101 +++++++++++++++++++++
 .../service/scalable/ScalableTopicController.java  |  26 ++----
 .../scalable/ScalableTopicControllerTest.java      |  19 ++--
 .../api/v5/V5ScalableSubscriptionAdminTest.java    |  91 +++++++++++++++++++
 .../apache/pulsar/client/admin/ScalableTopics.java |  26 ++++++
 .../client/admin/internal/ScalableTopicsImpl.java  |  22 +++++
 7 files changed, 263 insertions(+), 46 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 212d0019119..6b73fdf00d6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -58,7 +58,6 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.scalable.SegmentInfo;
 import org.apache.pulsar.common.scalable.SegmentTopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -704,8 +703,9 @@ public class ScalableTopics extends AdminResource {
     }
 
     /**
-     * Best-effort delete underlying persistent topics for all segments.
-     * Uses the internal admin client which handles cross-broker routing.
+     * Best-effort delete the underlying topic for every segment in the DAG. 
Uses the
+     * segment-aware admin endpoint, which routes to the segment-owning broker 
via the
+     * standard bundle-ownership lookup.
      */
     private CompletableFuture<Void> deleteSegmentTopics(TopicName parentTopic,
                                                          ScalableTopicMetadata 
metadata,
@@ -714,10 +714,11 @@ public class ScalableTopics extends AdminResource {
             var admin = pulsar().getAdminClient();
             CompletableFuture<?>[] futures = 
metadata.getSegments().values().stream()
                     .map(seg -> {
-                        String name = segmentPersistentName(parentTopic, seg);
-                        return admin.topics().deleteAsync(name, force)
+                        String segmentTopicName = SegmentTopicName.fromParent(
+                                parentTopic, seg.hashRange(), 
seg.segmentId()).toString();
+                        return 
admin.scalableTopics().deleteSegmentAsync(segmentTopicName, force)
                                 .exceptionally(ex -> {
-                                    log.warn().attr("segment", 
name).exceptionMessage(ex)
+                                    log.warn().attr("segment", 
segmentTopicName).exceptionMessage(ex)
                                             .log("Failed to delete segment 
topic");
                                     return null;
                                 });
@@ -730,15 +731,4 @@ public class ScalableTopics extends AdminResource {
             return CompletableFuture.completedFuture(null);
         }
     }
-
-    /**
-     * Convert a segment:// topic name to persistent:// for the underlying 
managed ledger topic.
-     */
-    private String segmentPersistentName(TopicName parentTopic, SegmentInfo 
segment) {
-        TopicName segTopic = SegmentTopicName.fromParent(
-                parentTopic, segment.hashRange(), segment.segmentId());
-        return "persistent://" + segTopic.getTenant() + "/"
-                + segTopic.getNamespacePortion() + "/"
-                + segTopic.getLocalName();
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
index 189e632b7e3..e8d4eeff654 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
@@ -181,6 +181,107 @@ public class Segments extends AdminResource {
                 });
     }
 
+    @PUT
+    
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}")
+    @ApiOperation(value = "Create a subscription cursor on the segment topic 
at the earliest"
+            + " position. Super-user only.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Subscription cursor created 
(or already existed)"),
+            @ApiResponse(code = 401, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 403, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void createSubscription(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify the parent topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", 
required = true)
+            @PathParam("descriptor") String descriptor,
+            @ApiParam(value = "Subscription name", required = true)
+            @PathParam("subscription") String subscription,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateNamespaceName(tenant, namespace);
+        TopicName segmentTopic = segmentTopicName(tenant, namespace, 
encodedTopic, descriptor);
+
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, 
authoritative))
+                .thenCompose(__ -> 
pulsar().getBrokerService().getOrCreateTopic(segmentTopic.toString()))
+                .thenCompose(topic -> topic.createSubscription(subscription,
+                        CommandSubscribe.InitialPosition.Earliest, false, 
null))
+                .thenAccept(__ -> {
+                    log.info().attr("clientAppId", 
clientAppId()).attr("segment", segmentTopic)
+                            .attr("subscription", subscription)
+                            .log("Created subscription on segment topic");
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    log.error().attr("clientAppId", 
clientAppId()).attr("segment", segmentTopic)
+                            .attr("subscription", subscription)
+                            .exception(ex).log("Failed to create subscription 
on segment");
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    @DELETE
+    
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}")
+    @ApiOperation(value = "Delete a subscription cursor on the segment topic. 
Super-user only.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Subscription cursor deleted 
(or never existed)"),
+            @ApiResponse(code = 401, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 403, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void deleteSubscription(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify the parent topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", 
required = true)
+            @PathParam("descriptor") String descriptor,
+            @ApiParam(value = "Subscription name", required = true)
+            @PathParam("subscription") String subscription,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateNamespaceName(tenant, namespace);
+        TopicName segmentTopic = segmentTopicName(tenant, namespace, 
encodedTopic, descriptor);
+
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, 
authoritative))
+                .thenCompose(__ -> 
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
+                .thenCompose(optTopic -> {
+                    if (optTopic.isEmpty()) {
+                        // Topic not loaded → no cursor to delete. Idempotent 
success.
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    var sub = optTopic.get().getSubscription(subscription);
+                    if (sub == null) {
+                        // Subscription doesn't exist on this segment — 
idempotent success.
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    return sub.delete();
+                })
+                .thenAccept(__ -> {
+                    log.info().attr("clientAppId", 
clientAppId()).attr("segment", segmentTopic)
+                            .attr("subscription", subscription)
+                            .log("Deleted subscription on segment topic");
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    log.error().attr("clientAppId", 
clientAppId()).attr("segment", segmentTopic)
+                            .attr("subscription", subscription)
+                            .exception(ex).log("Failed to delete subscription 
on segment");
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     @GET
     
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/backlog")
     @ApiOperation(value = "Number of unconsumed entries in the segment topic 
for the "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index cf43239f126..44d2230e49e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -729,11 +729,11 @@ public class ScalableTopicController {
     }
 
     private CompletableFuture<Void> createSubscriptionOnSegment(SegmentInfo 
segment, String subscription) {
-        String persistentName = toSegmentUnderlyingPersistentName(segment);
+        String segmentTopicName = toSegmentPersistentName(segment);
         try {
             return brokerService.getPulsar().getAdminClient()
-                    .topics().createSubscriptionAsync(persistentName, 
subscription,
-                            org.apache.pulsar.client.api.MessageId.earliest)
+                    .scalableTopics()
+                    .createSegmentSubscriptionAsync(segmentTopicName, 
subscription)
                     .exceptionally(ex -> {
                         Throwable cause = 
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
                         if (cause instanceof 
org.apache.pulsar.client.admin.PulsarAdminException.ConflictException) {
@@ -748,17 +748,18 @@ public class ScalableTopicController {
     }
 
     private CompletableFuture<Void> deleteSubscriptionOnSegment(SegmentInfo 
segment, String subscription) {
-        String persistentName = toSegmentUnderlyingPersistentName(segment);
+        String segmentTopicName = toSegmentPersistentName(segment);
         try {
             return brokerService.getPulsar().getAdminClient()
-                    .topics().deleteSubscriptionAsync(persistentName, 
subscription, true)
+                    .scalableTopics()
+                    .deleteSegmentSubscriptionAsync(segmentTopicName, 
subscription)
                     .exceptionally(ex -> {
                         Throwable cause = 
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
                         if (cause instanceof 
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException) {
                             return null;
                         }
                         log.warn().attr("subscription", subscription)
-                                .attr("segment", 
persistentName).exceptionMessage(cause)
+                                .attr("segment", 
segmentTopicName).exceptionMessage(cause)
                                 .log("Failed to delete subscription from 
segment");
                         return null;
                     });
@@ -1114,19 +1115,6 @@ public class ScalableTopicController {
         return segmentTopicName.toString();
     }
 
-    /**
-     * Return the {@code persistent://} form of a segment's underlying 
managed-ledger topic,
-     * suitable for the standard {@link org.apache.pulsar.client.admin.Topics} 
admin API.
-     * The segment-owning broker is discovered by the admin client's normal 
bundle routing.
-     */
-    private String toSegmentUnderlyingPersistentName(SegmentInfo segment) {
-        TopicName segmentTopicName = SegmentTopicName.fromParent(
-                topicName, segment.hashRange(), segment.segmentId());
-        return "persistent://" + segmentTopicName.getTenant() + "/"
-                + segmentTopicName.getNamespacePortion() + "/"
-                + segmentTopicName.getLocalName();
-    }
-
     private CompletableFuture<Void> terminateSegmentTopic(String 
segmentTopicName) {
         try {
             return brokerService.getPulsar().getAdminClient()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 2eb78a8de40..183e66b68d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.broker.service.TransportCnx;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.ScalableTopics;
 import org.apache.pulsar.client.admin.Topics;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ScalableTopicStats;
@@ -121,14 +120,14 @@ public class ScalableTopicControllerTest {
         // Default: all admin ops succeed.
         when(topics.getSubscriptionsAsync(anyString()))
                 
.thenReturn(CompletableFuture.completedFuture(java.util.List.of()));
-        when(topics.createSubscriptionAsync(anyString(), anyString(), 
any(MessageId.class)))
-                .thenReturn(CompletableFuture.completedFuture(null));
-        when(topics.deleteSubscriptionAsync(anyString(), anyString(), 
anyBoolean()))
-                .thenReturn(CompletableFuture.completedFuture(null));
         when(scalableTopics.createSegmentAsync(anyString(), any()))
                 .thenReturn(CompletableFuture.completedFuture(null));
         when(scalableTopics.terminateSegmentAsync(anyString()))
                 .thenReturn(CompletableFuture.completedFuture(null));
+        when(scalableTopics.createSegmentSubscriptionAsync(anyString(), 
anyString()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(scalableTopics.deleteSegmentSubscriptionAsync(anyString(), 
anyString()))
+                .thenReturn(CompletableFuture.completedFuture(null));
 
         controller = newController(topicName);
     }
@@ -337,9 +336,9 @@ public class ScalableTopicControllerTest {
                 resources.getSubscriptionAsync(topicName, "sub-stream").get();
         assertTrue(persisted.isPresent());
         assertEquals(persisted.get().type(), SubscriptionType.STREAM);
-        // Propagated to every active segment via 
admin.topics().createSubscriptionAsync().
-        verify(topics, org.mockito.Mockito.times(INITIAL_SEGMENTS))
-                .createSubscriptionAsync(anyString(), anyString(), 
any(MessageId.class));
+        // Propagated to every active segment via the segment-subscription 
admin endpoint.
+        verify(scalableTopics, org.mockito.Mockito.times(INITIAL_SEGMENTS))
+                .createSegmentSubscriptionAsync(anyString(), anyString());
     }
 
     @Test
@@ -370,8 +369,8 @@ public class ScalableTopicControllerTest {
         controller.deleteSubscription("sub-a").get();
         assertFalse(resources.getSubscriptionAsync(topicName, 
"sub-a").get().isPresent());
         // Propagated a delete to every segment (all segments incl. any sealed 
ones).
-        verify(topics, org.mockito.Mockito.atLeast(INITIAL_SEGMENTS))
-                .deleteSubscriptionAsync(anyString(), anyString(), 
anyBoolean());
+        verify(scalableTopics, org.mockito.Mockito.atLeast(INITIAL_SEGMENTS))
+                .deleteSegmentSubscriptionAsync(anyString(), anyString());
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java
new file mode 100644
index 00000000000..d55495999d5
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.policies.data.ScalableSubscriptionType;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code admin.scalableTopics().createSubscription(...)}: the 
admin
+ * API must materialize a cursor on every active segment so a consumer that
+ * subscribes <em>after</em> messages are produced still receives those
+ * messages — the whole point of pre-creating the subscription.
+ *
+ * <p>The behavioural assertion (a late consumer sees pre-subscription 
messages)
+ * is the user-facing guarantee, and any regression in
+ * {@code ScalableTopicController.createSubscriptionOnSegment} — which converts
+ * each {@code SegmentInfo} to the underlying {@code persistent://} topic and
+ * pre-creates the cursor through the standard topic admin API — would surface
+ * here as messages going missing.
+ */
+public class V5ScalableSubscriptionAdminTest extends V5ClientBaseTest {
+
+    @Test
+    public void testPreCreatedSubscriptionRetainsPreProductionMessages() 
throws Exception {
+        String topic = newScalableTopic(3);
+        String subscription = "pre-created-sub";
+
+        // Pre-create the subscription on the scalable topic. This must 
materialize a
+        // cursor on every active segment so that subsequent produces are 
retained
+        // until the consumer drains them.
+        admin.scalableTopics().createSubscription(topic, subscription,
+                ScalableSubscriptionType.QUEUE);
+
+        // Produce *before* any consumer subscribes.
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        int n = 30;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "msg-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Subscribe with the SAME subscription name. If createSubscription 
truly
+        // pre-created cursors on every segment, the consumer must receive 
every
+        // message produced above. If it didn't, the consumer attaches at 
"latest"
+        // by default and drops the entire backlog.
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        Set<String> received = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < n && System.currentTimeMillis() < deadline) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+                consumer.acknowledge(msg.id());
+            }
+        }
+        assertEquals(received, sent,
+                "pre-created subscription must retain every message produced 
before consumer subscribed");
+    }
+}
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index 10da169d14e..f9fb01444a6 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -313,6 +313,32 @@ public interface ScalableTopics {
      */
     CompletableFuture<Void> deleteSegmentAsync(String segmentTopic, boolean 
force);
 
+    /**
+     * Create a subscription cursor on the given segment topic at the earliest 
position.
+     * The call routes to the broker that owns the segment.
+     *
+     * <p>Used internally by {@link 
org.apache.pulsar.broker.service.scalable.ScalableTopicController
+     * ScalableTopicController} to fan a new scalable-topic subscription out 
across every
+     * active segment so a future consumer doesn't drop the backlog.
+     *
+     * @param segmentTopic Full segment topic name ({@code 
segment://tenant/namespace/topic/descriptor})
+     * @param subscription Subscription name
+     */
+    CompletableFuture<Void> createSegmentSubscriptionAsync(String 
segmentTopic, String subscription);
+
+    /**
+     * Delete a subscription cursor on the given segment topic. The call 
routes to the broker
+     * that owns the segment.
+     *
+     * <p>Used internally by {@link 
org.apache.pulsar.broker.service.scalable.ScalableTopicController
+     * ScalableTopicController} when a scalable-topic subscription is deleted, 
so no orphan
+     * cursors remain on any segment in the DAG.
+     *
+     * @param segmentTopic Full segment topic name ({@code 
segment://tenant/namespace/topic/descriptor})
+     * @param subscription Subscription name
+     */
+    CompletableFuture<Void> deleteSegmentSubscriptionAsync(String 
segmentTopic, String subscription);
+
     /**
      * Returns the number of unconsumed entries in the given subscription's 
cursor on the
      * segment topic — i.e. the per-subscription backlog. The call routes to 
the broker
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index 5a040123d2e..57d26c7011a 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -290,6 +290,28 @@ public class ScalableTopicsImpl extends BaseResource 
implements ScalableTopics {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public CompletableFuture<Void> createSegmentSubscriptionAsync(String 
segmentTopic,
+                                                                   String 
subscription) {
+        TopicName tn = TopicName.get(segmentTopic);
+        WebTarget path = adminSegments
+                .path(tn.getTenant()).path(tn.getNamespacePortion())
+                .path(tn.getLocalName()).path(tn.getSegmentDescriptor())
+                .path("subscription").path(subscription);
+        return asyncPutRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteSegmentSubscriptionAsync(String 
segmentTopic,
+                                                                   String 
subscription) {
+        TopicName tn = TopicName.get(segmentTopic);
+        WebTarget path = adminSegments
+                .path(tn.getTenant()).path(tn.getNamespacePortion())
+                .path(tn.getLocalName()).path(tn.getSegmentDescriptor())
+                .path("subscription").path(subscription);
+        return asyncDeleteRequest(path);
+    }
+
     @Override
     public CompletableFuture<Long> getSegmentSubscriptionBacklogAsync(String 
segmentTopic,
                                                                       String 
subscription) {

Reply via email to