This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 98355ca Added REST handler to create a subscription on a topic (#1151) 98355ca is described below commit 98355ca745b53f7864a9aac0ae709879d5bde48e Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Jan 31 09:11:48 2018 -0800 Added REST handler to create a subscription on a topic (#1151) --- .../pulsar/broker/admin/PersistentTopics.java | 87 +++++++++++++++-- .../broker/admin/CreateSubscriptionTest.java | 106 +++++++++++++++++++++ .../pulsar/client/admin/PersistentTopics.java | 46 ++++++++- .../admin/internal/PersistentTopicsImpl.java | 34 +++++-- .../pulsar/admin/cli/CmdPersistentTopics.java | 34 ++++++- .../pulsar/admin/cli/PulsarAdminToolTest.java | 19 ++-- 6 files changed, 297 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java index f241cf5..da8bb0a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java @@ -24,14 +24,12 @@ import static org.apache.pulsar.common.util.Codec.decode; import java.io.IOException; import java.io.OutputStream; -import java.time.Instant; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -66,6 +64,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -77,6 +76,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.MessageId; @@ -106,6 +106,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.zafarkhaja.semver.Version; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -116,7 +117,6 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import com.github.zafarkhaja.semver.Version; /** */ @@ -615,7 +615,7 @@ public class PersistentTopics extends AdminResource { DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) { validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace)); - } + } List<String> subscriptions = Lists.newArrayList(); PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative); @@ -656,7 +656,7 @@ public class PersistentTopics extends AdminResource { validateAdminAndClientPermission(dn); if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) { validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace)); - } + } validateDestinationOwnership(dn, authoritative); Topic topic = getTopicReference(dn); return topic.getStats(); @@ -676,7 +676,7 @@ public class PersistentTopics extends AdminResource { validateAdminAndClientPermission(dn); if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) { validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace)); - } + } validateDestinationOwnership(dn, authoritative); Topic topic = getTopicReference(dn); return topic.getInternalStats(); @@ -1024,6 +1024,67 @@ public class PersistentTopics extends AdminResource { } } + @PUT + @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subscriptionName}") + @ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "Creates a subscription on the topic at the specified message id") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), + @ApiResponse(code = 405, message = "Not supported for partitioned topics") }) + public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, + @PathParam("subscriptionName") String subscriptionName, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) throws PulsarServerException { + destination = decode(destination); + DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); + if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) { + validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace)); + } + log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), destination, + subscriptionName, messageId); + + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, + destination, authoritative); + + try { + if (partitionMetadata.partitions > 0) { + // Create the subscription on each partition + List<CompletableFuture<Void>> futures = Lists.newArrayList(); + PulsarAdmin admin = pulsar().getAdminClient(); + + for (int i = 0; i < partitionMetadata.partitions; i++) { + futures.add(admin.persistentTopics().createSubscriptionAsync(dn.getPartition(i).toString(), + subscriptionName, messageId)); + } + + FutureUtil.waitForAll(futures).join(); + } else { + validateAdminOperationOnDestination(dn, authoritative); + + PersistentTopic topic = (PersistentTopic) getOrCreateTopic(dn); + + if (topic.getSubscriptions().containsKey(subscriptionName)) { + throw new RestException(Status.CONFLICT, "Subscription already exists for topic"); + } + + PersistentSubscription subscription = (PersistentSubscription) topic + .createSubscription(subscriptionName).get(); + subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get(); + log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), dn, + subscriptionName, messageId); + } + } catch (Exception e) { + Throwable t = e.getCause(); + log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), dn, subscriptionName, + messageId, e); + if (t instanceof SubscriptionInvalidCursorPosition) { + throw new RestException(Status.PRECONDITION_FAILED, + "Unable to find position for position specified: " + t.getMessage()); + } else { + throw new RestException(e); + } + } + } + @POST @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor") @ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "It fence cursor and disconnects all active consumers before reseting cursor.") @@ -1324,10 +1385,10 @@ public class PersistentTopics extends AdminResource { dn.toString(), ex.getMessage(), ex); throw ex; } - + String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(), dn.getNamespacePortion(), "persistent", dn.getEncodedLocalName()); - + // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can // serve/redirect request else fail partitioned-metadata-request so, client fails while creating // producer/consumer @@ -1361,6 +1422,14 @@ public class PersistentTopics extends AdminResource { } } + private Topic getOrCreateTopic(DestinationName dn) { + try { + return pulsar().getBrokerService().getTopic(dn.toString()).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RestException(e); + } + } + /** * Get the Subscription object reference from the Topic reference */ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java new file mode 100644 index 0000000..6f759d8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +import javax.ws.rs.ClientErrorException; +import javax.ws.rs.core.Response.Status; + +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.naming.DestinationName; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; + +public class CreateSubscriptionTest extends MockedPulsarServiceBaseTest { + + @BeforeMethod + @Override + public void setup() throws Exception { + super.internalSetup(); + } + + @AfterMethod + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void createSubscriptionSingleTopic() throws Exception { + String topic = "persistent://prop-xyz/use/ns1/my-topic"; + admin.persistentTopics().createSubscription(topic, "sub-1", MessageId.latest); + + // Create should fail if the subscription already exists + try { + admin.persistentTopics().createSubscription(topic, "sub-1", MessageId.latest); + fail("Should have failed"); + } catch (ConflictException e) { + assertEquals(((ClientErrorException) e.getCause()).getResponse().getStatus(), + Status.CONFLICT.getStatusCode()); + } + + assertEquals(admin.persistentTopics().getSubscriptions(topic), Lists.newArrayList("sub-1")); + + Producer p1 = pulsarClient.createProducer(topic); + p1.send("test-1".getBytes()); + p1.send("test-2".getBytes()); + MessageId m3 = p1.send("test-3".getBytes()); + + assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-1").msgBacklog, 3); + + admin.persistentTopics().createSubscription(topic, "sub-2", MessageId.latest); + assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-2").msgBacklog, 0); + + admin.persistentTopics().createSubscription(topic, "sub-3", MessageId.earliest); + assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-3").msgBacklog, 3); + + admin.persistentTopics().createSubscription(topic, "sub-5", m3); + assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-5").msgBacklog, 1); + } + + @Test + public void createSubscriptionOnPartitionedTopic() throws Exception { + String topic = "persistent://prop-xyz/use/ns1/my-partitioned-topic"; + admin.persistentTopics().createPartitionedTopic(topic, 10); + + admin.persistentTopics().createSubscription(topic, "sub-1", MessageId.latest); + + // Create should fail if the subscription already exists + try { + admin.persistentTopics().createSubscription(topic, "sub-1", MessageId.latest); + fail("Should have failed"); + } catch (Exception e) { + // Expected + } + + for (int i = 0; i < 10; i++) { + assertEquals( + admin.persistentTopics().getSubscriptions(DestinationName.get(topic).getPartition(i).toString()), + Lists.newArrayList("sub-1")); + } + } +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java index 093ae3e..70642fe 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java @@ -196,7 +196,7 @@ public interface PersistentTopics { * @return a future that can be used to track when the partitioned topic is created */ CompletableFuture<Void> createPartitionedTopicAsync(String destination, int numPartitions); - + /** * Update number of partitions of a non-global partitioned topic. * <p> @@ -208,7 +208,7 @@ public interface PersistentTopics { * Destination name * @param numPartitions * Number of new partitions of already exist partitioned-topic - * + * * @return a future that can be used to track when the partitioned topic is updated */ void updatePartitionedTopic(String destination, int numPartitions) throws PulsarAdminException; @@ -224,7 +224,7 @@ public interface PersistentTopics { * Destination name * @param numPartitions * Number of new partitions of already exist partitioned-topic - * + * * @return a future that can be used to track when the partitioned topic is updated */ CompletableFuture<Void> updatePartitionedTopicAsync(String destination, int numPartitions); @@ -311,7 +311,7 @@ public interface PersistentTopics { * @return a future that can be used to track when the topic is deleted */ CompletableFuture<Void> deleteAsync(String destination); - + /** * Unload a topic. * <p> @@ -798,6 +798,42 @@ public interface PersistentTopics { CompletableFuture<List<Message>> peekMessagesAsync(String destination, String subName, int numMessages); /** + * Create a new subscription on a topic + * + * @param destination + * Destination name + * @param subscriptionName + * Subscription name + * @param messageId + * The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest}, + * {@link MessageId#earliest} or a specific message id. + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws ConflictException + * Subscription already exists + * @throws NotAllowedException + * Command disallowed for requested resource + * @throws PulsarAdminException + * Unexpected error + */ + void createSubscription(String destination, String subscriptionName, MessageId messageId) + throws PulsarAdminException; + + /** + * Create a new subscription on a topic + * + * @param destination + * Destination name + * @param subscriptionName + * Subscription name + * @param messageId + * The {@link MessageId} on where to initialize the subscription. It could be {@link MessageId#latest}, + * {@link MessageId#earliest} or a specific message id. + */ + CompletableFuture<Void> createSubscriptionAsync(String destination, String subscriptionName, MessageId messageId); + + /** * Reset cursor position on a topic subscription * * @param destination @@ -829,7 +865,7 @@ public interface PersistentTopics { * reset subscription to position closest to time in ms since epoch */ CompletableFuture<Void> resetCursorAsync(String destination, String subName, long timestamp); - + /** * Reset cursor position on a topic subscription * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java index cc1d96c..01ede00 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java @@ -181,7 +181,7 @@ public class PersistentTopicsImpl extends BaseResource implements PersistentTopi persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"), Entity.entity(numPartitions, MediaType.APPLICATION_JSON)); } - + @Override public PartitionedTopicMetadata getPartitionedTopicMetadata(String destination) throws PulsarAdminException { try { @@ -586,8 +586,8 @@ public class PersistentTopicsImpl extends BaseResource implements PersistentTopi peekMessagesAsync(destination, subName, numMessages, Lists.newArrayList(), future, 1); return future; } - - + + private void peekMessagesAsync(String destination, String subName, int numMessages, List<Message> messages, CompletableFuture<List<Message>> future, int nthMessage) { if (numMessages <= 0) { @@ -598,8 +598,8 @@ public class PersistentTopicsImpl extends BaseResource implements PersistentTopi // if peeking first message succeeds, we know that the topic and subscription exists peekNthMessage(destination, subName, nthMessage).handle((r, ex) -> { if (ex != null) { - // if we get a not found exception, it means that the position for the message we are trying to get - // does not exist. At this point, we can return the already found messages. + // if we get a not found exception, it means that the position for the message we are trying to get + // does not exist. At this point, we can return the already found messages. if (ex instanceof NotFoundException) { log.warn("Exception '{}' occured while trying to peek Messages.", ex.getMessage()); future.complete(messages); @@ -617,6 +617,28 @@ public class PersistentTopicsImpl extends BaseResource implements PersistentTopi } @Override + public void createSubscription(String destination, String subscriptionName, MessageId messageId) + throws PulsarAdminException { + try { + DestinationName ds = validateTopic(destination); + String encodedSubName = Codec.encode(subscriptionName); + request(persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("subscription") + .path(encodedSubName)).put(Entity.entity(messageId, MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public CompletableFuture<Void> createSubscriptionAsync(String destination, String subscriptionName, + MessageId messageId) { + DestinationName ds = validateTopic(destination); + String encodedSubName = Codec.encode(subscriptionName); + return asyncPutRequest(persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()) + .path("subscription").path(encodedSubName), Entity.entity(messageId, MediaType.APPLICATION_JSON)); + } + + @Override public void resetCursor(String destination, String subName, long timestamp) throws PulsarAdminException { try { DestinationName ds = validateTopic(destination); @@ -769,7 +791,7 @@ public class PersistentTopicsImpl extends BaseResource implements PersistentTopi log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex); } buf.release(); - singleMessageMetadataBuilder.recycle(); + singleMessageMetadataBuilder.recycle(); } return ret; } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index d7f7d51..5027e69 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -62,6 +62,7 @@ public class CmdPersistentTopics extends CmdBase { jcommander.addCommand("unload", new UnloadCmd()); jcommander.addCommand("subscriptions", new ListSubscriptions()); jcommander.addCommand("unsubscribe", new DeleteSubscription()); + jcommander.addCommand("create-subscription", new CreateSubscription()); jcommander.addCommand("stats", new GetStats()); jcommander.addCommand("stats-internal", new GetInternalStats()); jcommander.addCommand("info-internal", new GetInternalInfo()); @@ -418,6 +419,35 @@ public class CmdPersistentTopics extends CmdBase { } } + @Parameters(commandDescription = "Create a new subscription on a topic") + private class CreateSubscription extends CliCommand { + @Parameter(description = "persistent://property/cluster/namespace/destination", required = true) + private java.util.List<String> params; + + @Parameter(names = { "-s", + "--subscription" }, description = "Subscription to reset position on", required = true) + private String subscriptionName; + + @Parameter(names = { "--messageId", + "-m" }, description = "messageId where to create the subscription. It can be either 'latest', 'earliest' or (ledgerId:entryId)", required = false) + private String messageIdStr = "latest"; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + MessageId messageId; + if (messageIdStr.equals("latest")) { + messageId = MessageId.latest; + } else if (messageIdStr.equals("earliest")) { + messageId = MessageId.earliest; + } else { + messageId = validateMessageIdString(messageIdStr); + } + + persistentTopics.createSubscription(persistentTopic, subscriptionName, messageId); + } + } + @Parameters(commandDescription = "Reset position for subscription to position closest to timestamp or messageId") private class ResetCursor extends CliCommand { @Parameter(description = "persistent://property/cluster/namespace/destination", required = true) @@ -430,7 +460,7 @@ public class CmdPersistentTopics extends CmdBase { @Parameter(names = { "--time", "-t" }, description = "time in minutes to reset back to (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = false) private String resetTimeStr; - + @Parameter(names = { "--messageId", "-m" }, description = "messageId to reset back to (ledgerId:entryId)", required = false) private String resetMessageIdStr; @@ -534,7 +564,7 @@ public class CmdPersistentTopics extends CmdBase { return Integer.parseInt(s); } } - + private MessageId validateMessageIdString(String resetMessageIdStr) throws PulsarAdminException { String[] messageId = resetMessageIdStr.split(":"); try { diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 411ef7e..00f18a9 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -36,6 +36,8 @@ import org.apache.pulsar.client.admin.PersistentTopics; import org.apache.pulsar.client.admin.Properties; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.ResourceQuotas; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; @@ -96,7 +98,7 @@ public class PulsarAdminToolTest { brokerStats.run(split("monitoring-metrics")); verify(mockBrokerStats).getMetrics(); } - + @Test void getOwnedNamespaces() throws Exception { PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); @@ -132,7 +134,7 @@ public class PulsarAdminToolTest { clusters.run(split("delete use")); verify(mockClusters).deleteCluster("use"); - + clusters.run(split("list-failure-domains use")); verify(mockClusters).getFailureDomains("use"); @@ -303,19 +305,19 @@ public class PulsarAdminToolTest { namespaces.run(split("get-message-ttl myprop/clust/ns1")); verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1"); - + namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g group")); verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", "group"); namespaces.run(split("get-anti-affinity-group myprop/clust/ns1")); verify(mockNamespaces).getNamespaceAntiAffinityGroup("myprop/clust/ns1"); - + namespaces.run(split("get-anti-affinity-namespaces -p dummy -c cluster -g group")); verify(mockNamespaces).getAntiAffinityNamespaces("dummy", "cluster", "group"); namespaces.run(split("delete-anti-affinity-group myprop/clust/ns1 ")); verify(mockNamespaces).deleteNamespaceAntiAffinityGroup("myprop/clust/ns1"); - + namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M")); verify(mockNamespaces).setRetention("myprop/clust/ns1", new RetentionPolicies(60, 1)); @@ -444,6 +446,9 @@ public class PulsarAdminToolTest { topics.run(split("expire-messages-all-subscriptions persistent://myprop/clust/ns1/ds1 -t 100")); verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1", 100); + topics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest")); + verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest); + topics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32")); verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32); @@ -494,10 +499,10 @@ public class PulsarAdminToolTest { topics.run(split("create-partitioned-topic non-persistent://myprop/clust/ns1/ds1 --partitions 32")); verify(mockTopics).createPartitionedTopic("non-persistent://myprop/clust/ns1/ds1", 32); - + topics.run(split("list myprop/clust/ns1")); verify(mockTopics).getList("myprop/clust/ns1"); - + topics.run(split("list-in-bundle myprop/clust/ns1 --bundle 0x23d70a30_0x26666658")); verify(mockTopics).getListInBundle("myprop/clust/ns1", "0x23d70a30_0x26666658"); -- To stop receiving notification emails like this one, please contact mme...@apache.org.