BewareMyPower commented on code in PR #22797: URL: https://github.com/apache/pulsar/pull/22797#discussion_r1625271648
########## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java: ########## @@ -113,6 +116,52 @@ public void testPartitionedShadowTopicSetup() throws Exception { Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopicPartition); } + @Test + public void testPartitionedShadowTopicProduceAndConsume() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createPartitionedTopic(sourceTopic, 3); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + + // We should not allow to set with the shadow partition topic which contains `-partition-n`. + Assert.assertThrows(PulsarAdminException.class, ()-> admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic+"-partition-0"))); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic).subscriptionName("test") + .subscribe(); + + for (int i = 0; i < 10; i++) { + producer.send("msg-" + i); + } + + Set<String> set = new HashSet<>(); + for (int i = 0; i < 10; i++) { + Message<String> msg = consumer.receive(); + set.add(msg.getValue()); + } + for (int i = 0; i < 10; i++) { + Assert.assertTrue(set.contains("msg-" + i)); + } + } + + @Test + public void testPartitionedShadowTopicExpansion() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createPartitionedTopic(sourceTopic, 1); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + + Assert.assertThrows(PulsarAdminException.class, () -> admin.topics().updatePartitionedTopic(sourceTopic, 3)); + + admin.topics().updatePartitionedTopic(shadowTopic, 3); + + admin.topics().updatePartitionedTopic(sourceTopic, 3); Review Comment: I think we can separate this PR into two parts. 1. The shadow topic support for partitioned topics 2. The validation when expanding the partitions of the partitioned topic that has its shadow topics. IMO, the 2nd part is not necessarily included in this PR. Running two `updatePartitionedTopic` RPCs seems a little complicated. IMO, we can update the partitions of shadow topics as well when updating the source topic. It might requires a PIP to do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org