This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 012b85acaae974ee451f36560ce1a1295fb3b221 Author: xiangying <[email protected]> AuthorDate: Wed Dec 7 16:03:47 2022 +0800 Revert "PIP-105: new API to get subscription properties (#16095)" This reverts commit d2ff2936097f8920b4521923685b7623eda5fcdc. --- .../broker/admin/impl/PersistentTopicsBase.java | 114 --------------------- .../pulsar/broker/admin/v2/PersistentTopics.java | 36 ------- .../broker/admin/AdminApiSubscriptionTest.java | 36 ------- .../org/apache/pulsar/client/admin/Topics.java | 16 --- .../pulsar/client/admin/internal/TopicsImpl.java | 29 ------ .../pulsar/admin/cli/PulsarAdminToolTest.java | 4 - .../org/apache/pulsar/admin/cli/CmdTopics.java | 20 ---- .../pulsar/tests/integration/cli/CLITest.java | 26 ----- 8 files changed, 281 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 956679df95b..698da80265d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1636,35 +1636,6 @@ public class PersistentTopicsBase extends AdminResource { }); } - private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse, - String subName, - boolean authoritative) { - validateTopicOwnershipAsync(topicName, authoritative) - .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME)) - .thenCompose(__ -> getTopicReferenceAsync(topicName)) - .thenApply((Topic topic) -> { - Subscription sub = topic.getSubscription(subName); - if (sub == null) { - throw new RestException(Status.NOT_FOUND, - getSubNotFoundErrorMessage(topicName.toString(), subName)); - } - return sub.getSubscriptionProperties(); - }).thenAccept((Map<String, String> properties) -> { - if (properties == null) { - properties = Collections.emptyMap(); - } - asyncResponse.resume(Response.ok(properties).build()); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { - log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause); - } - asyncResponse.resume(new RestException(cause)); - return null; - }); - } - protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { CompletableFuture<Void> future; @@ -2413,91 +2384,6 @@ public class PersistentTopicsBase extends AdminResource { }); } - protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName, - boolean authoritative) { - CompletableFuture<Void> future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - - future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> { - if (topicName.isPartitioned()) { - internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, - authoritative); - } else { - getPartitionedTopicMetadataAsync(topicName, - authoritative, false).thenAcceptAsync(partitionMetadata -> { - if (partitionMetadata.partitions > 0) { - final List<CompletableFuture<Map<String, String>>> futures = Lists.newArrayList(); - - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar().getAdminClient().topics() - .getSubscriptionPropertiesAsync(topicNamePartition.toString(), - subName)); - } catch (Exception e) { - log.error("[{}] Failed to update properties for subscription {} {}", - clientAppId(), topicNamePartition, subName, - e); - asyncResponse.resume(new RestException(e)); - return; - } - } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - getSubNotFoundErrorMessage(topicName.toString(), subName))); - return null; - } else { - log.error("[{}] Failed to get properties for subscription {} {}", - clientAppId(), topicName, subName, t); - asyncResponse.resume(new RestException(t)); - return null; - } - } - - Map<String, String> aggregatedResult = new HashMap<>(); - futures.forEach(f -> { - // in theory all the partitions have the same properties - try { - aggregatedResult.putAll(f.get()); - } catch (Exception impossible) { - // we already waited for this Future - asyncResponse.resume(new RestException(impossible)); - } - }); - - asyncResponse.resume(Response.ok(aggregatedResult).build()); - return null; - }); - } else { - internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, - authoritative); - } - }, pulsar().getExecutor()).exceptionally(ex -> { - log.error("[{}] Failed to update properties for subscription {} from topic {}", - clientAppId(), subName, topicName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); - } - }).exceptionally(ex -> { - // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { - log.error("[{}] Failed to update subscription {} from topic {}", - clientAppId(), subName, topicName, ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); - } - protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative, MessageIdImpl messageId, boolean isExcluded, int batchIndex) { CompletableFuture<Void> ret; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 00cc19e0c98..56e3799c475 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1564,42 +1564,6 @@ public class PersistentTopics extends PersistentTopicsBase { } } - @GET - @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties") - @ApiOperation(value = "Replaces all the properties on the given subscription") - @ApiResponses(value = { - @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), - @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" - + "subscriber is not authorized to access this operation"), - @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), - @ApiResponse(code = 405, message = "Method Not Allowed"), - @ApiResponse(code = 500, message = "Internal server error"), - @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") - }) - public void getSubscriptionProperties( - @Suspended final AsyncResponse asyncResponse, - @ApiParam(value = "Specify the tenant", required = true) - @PathParam("tenant") String tenant, - @ApiParam(value = "Specify the namespace", required = true) - @PathParam("namespace") String namespace, - @ApiParam(value = "Specify topic name", required = true) - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Subscription", required = true) - @PathParam("subName") String encodedSubName, - @ApiParam(value = "Is authentication required to perform this operation") - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateTopicName(tenant, namespace, encodedTopic); - internalGetSubscriptionProperties(asyncResponse, decode(encodedSubName), - authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } - } - @POST @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor") @ApiOperation(value = "Reset subscription to message position closest to given position.", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java index 521ef4df1c8..f7af28ddf41 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java @@ -152,9 +152,6 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) .getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); - - Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName); - assertEquals(value, props.get("foo")); } // properties are never null, but an empty map @@ -162,9 +159,6 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) .getSubscriptions().get(subscriptionName2); assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); - - Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName2); - assertTrue(props.isEmpty()); } // aggregated properties @@ -172,21 +166,12 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { .getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); - Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); - assertEquals(value, props.get("foo")); - } else { SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); - Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); - assertEquals(value, props.get("foo")); - SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2); assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty()); - - Map<String, String> props2 = admin.topics().getSubscriptionProperties(topic, subscriptionName2); - assertTrue(props2.isEmpty()); } // clear the properties on subscriptionName @@ -198,9 +183,6 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) .getSubscriptions().get(subscriptionName); assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); - - Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName); - assertTrue(props.isEmpty()); } // aggregated properties @@ -208,15 +190,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { .getSubscriptions().get(subscriptionName); assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); - Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); - assertTrue(props.isEmpty()); - } else { SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); - - Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); - assertTrue(props.isEmpty()); } // update the properties on subscriptionName @@ -228,9 +204,6 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) .getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); - - Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName); - assertEquals(value, props.get("foo")); } // aggregated properties @@ -238,21 +211,12 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { .getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); - Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); - assertEquals(value, props.get("foo")); - } else { SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); - Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); - assertEquals(value, props.get("foo")); - SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2); assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty()); - - Map<String, String> props2 = admin.topics().getSubscriptionProperties(topic, subscriptionName2); - assertTrue(props2.isEmpty()); } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 2fa14e54434..73f9a199a1b 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1764,15 +1764,6 @@ public interface Topics { void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties) throws PulsarAdminException; - /** - * Get Subscription Properties on a topic subscription. - * @param topic - * @param subName - * @throws PulsarAdminException - */ - Map<String, String> getSubscriptionProperties(String topic, String subName) - throws PulsarAdminException; - /** * Reset cursor position on a topic subscription. * @@ -1807,13 +1798,6 @@ public interface Topics { CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName, Map<String, String> subscriptionProperties); - /** - * Get Subscription Properties on a topic subscription. - * @param topic - * @param subName - */ - CompletableFuture<Map<String, String>> getSubscriptionPropertiesAsync(String topic, String subName); - /** * Reset cursor position on a topic subscription. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index a2360fb5b73..dbdaffd9e5c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1179,12 +1179,6 @@ public class TopicsImpl extends BaseResource implements Topics { sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties)); } - @Override - public Map<String, String> getSubscriptionProperties(String topic, String subName) - throws PulsarAdminException { - return sync(() -> getSubscriptionPropertiesAsync(topic, subName)); - } - @Override public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName, Map<String, String> subscriptionProperties) { @@ -1198,29 +1192,6 @@ public class TopicsImpl extends BaseResource implements Topics { return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON)); } - @Override - public CompletableFuture<Map<String, String>> getSubscriptionPropertiesAsync(String topic, String subName) { - TopicName tn = validateTopic(topic); - String encodedSubName = Codec.encode(subName); - WebTarget path = topicPath(tn, "subscription", encodedSubName, - "properties"); - final CompletableFuture<Map<String, String>> future = new CompletableFuture<>(); - asyncGetRequest(path, - new InvocationCallback<Map<String, String>>() { - - @Override - public void completed(Map<String, String> response) { - future.complete(response); - } - - @Override - public void failed(Throwable throwable) { - future.completeExceptionally(getApiException(throwable.getCause())); - } - }); - return future; - } - @Override public void resetCursor(String topic, String subName, MessageId messageId , boolean isExcluded) throws PulsarAdminException { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 067e08b5599..8e7e339b536 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1422,10 +1422,6 @@ public class PulsarAdminToolTest { cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear")); verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>()); - cmdTopics = new CmdTopics(() -> admin); - cmdTopics.run(split("get-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1")); - verify(mockTopics).getSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1"); - cmdTopics = new CmdTopics(() -> admin); props = new HashMap<>(); props.put("a", "b"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 562d7993f5c..8fc8d5646d7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -74,7 +74,6 @@ import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.util.DateFormatter; -import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.RelativeTimeUtil; @Getter @@ -100,7 +99,6 @@ public class CmdTopics extends CmdBase { jcommander.addCommand("unsubscribe", new DeleteSubscription()); jcommander.addCommand("create-subscription", new CreateSubscription()); jcommander.addCommand("update-subscription-properties", new UpdateSubscriptionProperties()); - jcommander.addCommand("get-subscription-properties", new GetSubscriptionProperties()); jcommander.addCommand("stats", new GetStats()); jcommander.addCommand("stats-internal", new GetInternalStats()); @@ -970,24 +968,6 @@ public class CmdTopics extends CmdBase { } } - @Parameters(commandDescription = "Get the properties of a subscription on a topic") - private class GetSubscriptionProperties extends CliCommand { - @Parameter(description = "persistent://tenant/namespace/topic", required = true) - private java.util.List<String> params; - - @Parameter(names = { "-s", - "--subscription" }, description = "Subscription to describe", required = true) - private String subscriptionName; - - @Override - void run() throws Exception { - String topic = validateTopicName(params); - Map<String, String> result = getTopics().getSubscriptionProperties(topic, subscriptionName); - // Ensure we are using JSON and not Java toString() - System.out.println(ObjectMapperFactory.getThreadLocal().writeValueAsString(result)); - } - } - @Parameters(commandDescription = "Reset position for subscription to a position that is closest to " + "timestamp or messageId.") diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java index cda2347b4dc..a1e417ca547 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java @@ -207,18 +207,6 @@ public class CLITest extends PulsarTestSuite { ); resultUpdate.assertNoOutput(); - ContainerExecResult resultGet = container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "topics", - "get-subscription-properties", - "persistent://public/default/" + topic, - "--subscription", - "" + subscriptionPrefix + i - ); - assertEquals( - resultGet.getStdout().trim(), "{\"a\":\"e\"}", - "unexpected output " + resultGet.getStdout() + " - error " + resultGet.getStderr()); - ContainerExecResult resultClear = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "topics", @@ -229,20 +217,6 @@ public class CLITest extends PulsarTestSuite { "" + subscriptionPrefix + i ); resultClear.assertNoOutput(); - - ContainerExecResult resultGetAfterClear = container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "topics", - "get-subscription-properties", - "persistent://public/default/" + topic, - "--subscription", - "" + subscriptionPrefix + i - ); - assertEquals( - resultGetAfterClear.getStdout().trim(), "{}", - "unexpected output " + resultGetAfterClear.getStdout() - + " - error " + resultGetAfterClear.getStderr()); - i++; } }
