poorbarcode commented on code in PR #23980:
URL: https://github.com/apache/pulsar/pull/23980#discussion_r1955483706


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -245,6 +245,7 @@ public static boolean isDedupCursorName(String name) {
     private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
     private volatile CompletableFuture<Long> currentCompaction = 
CompletableFuture.completedFuture(
             COMPACTION_NEVER_RUN);
+    private volatile AtomicBoolean disablingCompaction = new 
AtomicBoolean(false);

Review Comment:
   Good point, I have changed the modifier



##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -308,6 +319,145 @@ public void 
testGetLastMessageIdAfterCompactionWithCompression(boolean enabledBa
         admin.topics().delete(topicName, false);
     }
 
+    @DataProvider
+    public Object[][] isInjectedCursorDeleteError() {
+        return new Object[][] {
+                {false},
+                {true}
+        };
+    }
+
+    @Test(dataProvider = "isInjectedCursorDeleteError")
+    public void testReadMsgsAfterDisableCompaction(boolean 
isInjectedCursorDeleteError) throws Exception {
+        String topicName = "persistent://public/default/" + 
BrokerTestUtil.newUniqueName("tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topicPolicies().setCompactionThreshold(topicName, 1);
+        admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+        var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        producer.newMessage().key("k0").value("v0").send();
+        producer.newMessage().key("k1").value("v1").send();
+        producer.newMessage().key("k2").value("v2").send();
+        triggerCompactionAndWait(topicName);
+        admin.topics().deleteSubscription(topicName, "s1");
+
+        // Disable compaction.
+        // Inject a failure that the first time to delete cursor will fail.
+        if (isInjectedCursorDeleteError) {
+            AtomicInteger times = new AtomicInteger();
+            String cursorPath = 
String.format("/managed-ledgers/%s/__compaction",
+                    TopicName.get(topicName).getPersistenceNamingEncoding());
+            admin.topicPolicies().removeCompactionThreshold(topicName);
+            mockZooKeeper.failConditional(KeeperException.Code.SESSIONEXPIRED, 
(op, path) -> {
+                return op == MockZooKeeper.Op.DELETE && 
cursorPath.equals(path) && times.incrementAndGet() == 1;
+            });
+            
mockZooKeeperGlobal.failConditional(KeeperException.Code.SESSIONEXPIRED, (op, 
path) -> {
+                return op == MockZooKeeper.Op.DELETE && 
cursorPath.equals(path) && times.incrementAndGet() == 1;
+            });
+            try {
+                admin.topics().deleteSubscription(topicName, "__compaction");
+                fail("Should fail");
+            } catch (Exception ex) {
+                assertTrue(ex instanceof 
PulsarAdminException.ServerSideErrorException);
+            }
+        }
+
+        // Create a reader with start at earliest.
+        // Verify: the reader will receive 3 messages.
+        admin.topics().unload(topicName);
+        Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).readCompacted(true)
+                .startMessageId(MessageId.earliest).create();
+        producer.newMessage().key("k3").value("v3").send();
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m0 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m0.getValue(), "v0");
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m1 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m1.getValue(), "v1");
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m2 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m2.getValue(), "v2");
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m3 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m3.getValue(), "v3");
+
+        // cleanup.
+        producer.close();
+        reader.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @Test
+    public void testDisableCompactionConcurrently() throws Exception {
+        String topicName = "persistent://public/default/" + 
BrokerTestUtil.newUniqueName("tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topicPolicies().setCompactionThreshold(topicName, 1);
+        admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+        var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        producer.newMessage().key("k0").value("v0").send();
+        triggerCompactionAndWait(topicName);
+        admin.topics().deleteSubscription(topicName, "s1");
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        AtomicBoolean disablingCompaction =
+                WhiteboxImpl.getInternalState(persistentTopic, 
"disablingCompaction");

Review Comment:
   Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to