BewareMyPower commented on code in PR #22797:
URL: https://github.com/apache/pulsar/pull/22797#discussion_r1625271648


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java:
##########
@@ -113,6 +116,52 @@ public void testPartitionedShadowTopicSetup() throws 
Exception {
         
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), 
sourceTopicPartition);
     }
 
+    @Test
+    public void testPartitionedShadowTopicProduceAndConsume() throws Exception 
{
+        String sourceTopic = newShadowSourceTopicName();
+        String shadowTopic = sourceTopic + "-shadow";
+        admin.topics().createPartitionedTopic(sourceTopic, 3);
+        admin.topics().createShadowTopic(shadowTopic, sourceTopic);
+
+        // We should not allow to set with the shadow partition topic which 
contains `-partition-n`.
+        Assert.assertThrows(PulsarAdminException.class, ()-> 
admin.topics().setShadowTopics(sourceTopic, 
Lists.newArrayList(shadowTopic+"-partition-0")));
+        admin.topics().setShadowTopics(sourceTopic, 
Lists.newArrayList(shadowTopic));
+
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+        @Cleanup
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic).subscriptionName("test")
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("msg-" + i);
+        }
+
+        Set<String> set = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            Message<String> msg = consumer.receive();
+            set.add(msg.getValue());
+        }
+        for (int i = 0; i < 10; i++) {
+            Assert.assertTrue(set.contains("msg-" + i));
+        }
+    }
+
+    @Test
+    public void testPartitionedShadowTopicExpansion() throws Exception {
+        String sourceTopic = newShadowSourceTopicName();
+        String shadowTopic = sourceTopic + "-shadow";
+        admin.topics().createPartitionedTopic(sourceTopic, 1);
+        admin.topics().createShadowTopic(shadowTopic, sourceTopic);
+        admin.topics().setShadowTopics(sourceTopic, 
Lists.newArrayList(shadowTopic));
+
+        Assert.assertThrows(PulsarAdminException.class, () -> 
admin.topics().updatePartitionedTopic(sourceTopic, 3));
+
+        admin.topics().updatePartitionedTopic(shadowTopic, 3);
+
+        admin.topics().updatePartitionedTopic(sourceTopic, 3);

Review Comment:
   I think we can separate this PR into two parts.
   1. The shadow topic support for partitioned topics
   2. The validation when expanding the partitions of the partitioned topic 
that has its shadow topics.
   
   IMO, the 2nd part is not necessarily included in this PR. Running two 
`updatePartitionedTopic` RPCs seems a little complicated. IMO, we can update 
the partitions of shadow topics as well when updating the source topic. It 
might requires a PIP to do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to