BewareMyPower commented on code in PR #15663:
URL: https://github.com/apache/pulsar/pull/15663#discussion_r881249312


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java:
##########
@@ -171,4 +177,56 @@ public void testHealthCheckTopicNotOffload() throws 
Exception {
         });
     }
 
+    @Test
+    private void testSetBacklogCausedCreatingProducerFailure() throws 
Exception {
+        final String ns = "prop/ns-test";
+        final String topic = ns + "/topic-1";
+
+        admin.namespaces().createNamespace(ns, 2);
+        admin.topics().createPartitionedTopic(String.format("persistent://%s", 
topic), 1);
+        BacklogQuota quota = BacklogQuota.builder()
+                .limitTime(30)

Review Comment:
   Should we reduce this limit time since the rollover time has been changed to 
3 seconds?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java:
##########
@@ -171,4 +177,56 @@ public void testHealthCheckTopicNotOffload() throws 
Exception {
         });
     }
 
+    @Test
+    private void testSetBacklogCausedCreatingProducerFailure() throws 
Exception {
+        final String ns = "prop/ns-test";
+        final String topic = ns + "/topic-1";
+
+        admin.namespaces().createNamespace(ns, 2);
+        admin.topics().createPartitionedTopic(String.format("persistent://%s", 
topic), 1);
+        BacklogQuota quota = BacklogQuota.builder()
+                .limitTime(30)
+                .limitSize(-1)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                .build();
+        admin.namespaces().setBacklogQuota(ns, quota, 
BacklogQuota.BacklogQuotaType.message_age);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        String partition0 = TopicName.get(String.format("persistent://%s", 
topic)).getPartition(0).toString();
+        Optional<Topic> topicReference = 
pulsar.getBrokerService().getTopicReference(partition0);
+        Assert.assertTrue(topicReference.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) 
topicReference.get();
+        
persistentTopic.getManagedLedger().getConfig().setMinimumRolloverTime(3, 
TimeUnit.SECONDS);
+        
persistentTopic.getManagedLedger().getConfig().setMaximumRolloverTime(3, 
TimeUnit.SECONDS);
+        String msg1 = "msg-1";
+        producer.send(msg1);
+        Thread.sleep(3 * 1000);
+
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub-1")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        Message<String> receive = consumer2.receive();
+        consumer2.acknowledge(receive);
+
+        Thread.sleep(5 * 1000);
+
+        int producerCount = 10;
+        for (int i = 0; i < producerCount; i++) {
+            try {
+                Thread.sleep(1000);

Review Comment:
   Why do you sleep for 1 second between two creations of producers?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java:
##########
@@ -171,4 +177,56 @@ public void testHealthCheckTopicNotOffload() throws 
Exception {
         });
     }
 
+    @Test
+    private void testSetBacklogCausedCreatingProducerFailure() throws 
Exception {
+        final String ns = "prop/ns-test";
+        final String topic = ns + "/topic-1";
+
+        admin.namespaces().createNamespace(ns, 2);
+        admin.topics().createPartitionedTopic(String.format("persistent://%s", 
topic), 1);
+        BacklogQuota quota = BacklogQuota.builder()
+                .limitTime(30)
+                .limitSize(-1)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                .build();
+        admin.namespaces().setBacklogQuota(ns, quota, 
BacklogQuota.BacklogQuotaType.message_age);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        String partition0 = TopicName.get(String.format("persistent://%s", 
topic)).getPartition(0).toString();
+        Optional<Topic> topicReference = 
pulsar.getBrokerService().getTopicReference(partition0);
+        Assert.assertTrue(topicReference.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) 
topicReference.get();
+        
persistentTopic.getManagedLedger().getConfig().setMinimumRolloverTime(3, 
TimeUnit.SECONDS);
+        
persistentTopic.getManagedLedger().getConfig().setMaximumRolloverTime(3, 
TimeUnit.SECONDS);

Review Comment:
   It looks like we need to call `setLedgerRolloverTimeout`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to