This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit aba36c24801cb2a750d16add3e074f5dc095b593 Author: Ruguo Yu <[email protected]> AuthorDate: Wed Nov 24 08:37:18 2021 +0800 [Broker] Correct param of delete method for v1 topic (#12936) (cherry picked from commit e6d9df81c446870107dbb8d8e454b11b71cc9255) --- .../pulsar/broker/admin/v1/PersistentTopics.java | 10 ++- .../java/org/apache/pulsar/schema/SchemaTest.java | 90 ++++++++++++++++++++++ 2 files changed, 96 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index fee8707..a0309ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -232,10 +232,11 @@ public class PersistentTopics extends PersistentTopicsBase { @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("force") @DefaultValue("false") boolean force, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) { try { validateTopicName(property, cluster, namespace, encodedTopic); - internalDeletePartitionedTopic(asyncResponse, authoritative, force, false); + internalDeletePartitionedTopic(asyncResponse, authoritative, force, deleteSchema); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { @@ -270,9 +271,10 @@ public class PersistentTopics extends PersistentTopicsBase { public void deleteTopic(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("force") @DefaultValue("false") boolean force, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) { validateTopicName(property, cluster, namespace, encodedTopic); - internalDeleteTopic(authoritative, force); + internalDeleteTopic(authoritative, force, deleteSchema); } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index f13b266..bbea161 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -769,6 +769,96 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { } @Test + public void testDeleteTopicAndSchemaForV1() throws Exception { + final String tenant = PUBLIC_TENANT; + final String cluster = CLUSTER_NAME; + final String namespace = "test-namespace-" + randomName(16); + final String topicOne = "not-partitioned-topic"; + final String topic2 = "persistent://" + tenant + "/" + cluster + "/" + namespace + "/partitioned-topic"; + + // persistent, not-partitioned v1/topic + final String topic1 = TopicName.get( + TopicDomain.persistent.value(), + tenant, + cluster, + namespace, + topicOne).toString(); + + // persistent, partitioned v1/topic + admin.topics().createPartitionedTopic(topic2, 1); + + @Cleanup + Producer<Schemas.PersonOne> p1_1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class)) + .topic(topic1) + .create(); + + @Cleanup + Producer<Schemas.PersonThree> p1_2 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class)) + .topic(topic1) + .create(); + @Cleanup + Producer<Schemas.PersonThree> p2_1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class)) + .topic(topic2) + .create(); + + List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> schemaFutures1 = + this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topic1).getSchemaName()).get(); + FutureUtil.waitForAll(schemaFutures1).get(); + List<SchemaRegistry.SchemaAndMetadata> schemas1 = schemaFutures1.stream().map(future -> { + try { + return future.get(); + } catch (Exception e) { + return null; + } + }).collect(Collectors.toList()); + assertEquals(schemas1.size(), 2); + for (SchemaRegistry.SchemaAndMetadata schema : schemas1) { + assertNotNull(schema); + } + + List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> schemaFutures2 = + this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topic2).getSchemaName()).get(); + FutureUtil.waitForAll(schemaFutures2).get(); + List<SchemaRegistry.SchemaAndMetadata> schemas2 = schemaFutures2.stream().map(future -> { + try { + return future.get(); + } catch (Exception e) { + return null; + } + }).collect(Collectors.toList()); + assertEquals(schemas2.size(), 1); + for (SchemaRegistry.SchemaAndMetadata schema : schemas2) { + assertNotNull(schema); + } + + // not-force and not-delete-schema when delete topic + try { + admin.topics().delete(topic1, false, false); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions")); + } + assertEquals(this.getPulsar().getSchemaRegistryService() + .trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 2); + try { + admin.topics().deletePartitionedTopic(topic2, false, false); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions")); + } + assertEquals(this.getPulsar().getSchemaRegistryService() + .trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 1); + + // force and delete-schema when delete topic + admin.topics().delete(topic1, true, true); + assertEquals(this.getPulsar().getSchemaRegistryService() + .trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 0); + admin.topics().deletePartitionedTopic(topic2, true, true); + assertEquals(this.getPulsar().getSchemaRegistryService() + .trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 0); + } + + @Test public void testProducerMultipleSchemaMessages() throws Exception { final String tenant = PUBLIC_TENANT; final String namespace = "test-namespace-" + randomName(16);
