lhotari commented on code in PR #23980:
URL: https://github.com/apache/pulsar/pull/23980#discussion_r1955096813


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1340,18 +1341,42 @@ private void 
asyncDeleteCursorWithCleanCompactionLedger(PersistentSubscription s
             return;
         }
 
-        currentCompaction.handle((__, e) -> {
-            if (e != null) {
-                log.warn("[{}][{}] Last compaction task failed", topic, 
subscriptionName);
+        // Avoid concurrently execute compaction and unsubscribing.
+        synchronized (this) {

Review Comment:
   Is there really a need for a synchronized block here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -245,6 +245,7 @@ public static boolean isDedupCursorName(String name) {
     private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
     private volatile CompletableFuture<Long> currentCompaction = 
CompletableFuture.completedFuture(
             COMPACTION_NEVER_RUN);
+    private volatile AtomicBoolean disablingCompaction = new 
AtomicBoolean(false);

Review Comment:
   ```suggestion
       private final AtomicBoolean disablingCompaction = new 
AtomicBoolean(false);
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -308,6 +319,145 @@ public void 
testGetLastMessageIdAfterCompactionWithCompression(boolean enabledBa
         admin.topics().delete(topicName, false);
     }
 
+    @DataProvider
+    public Object[][] isInjectedCursorDeleteError() {
+        return new Object[][] {
+                {false},
+                {true}
+        };
+    }
+
+    @Test(dataProvider = "isInjectedCursorDeleteError")
+    public void testReadMsgsAfterDisableCompaction(boolean 
isInjectedCursorDeleteError) throws Exception {
+        String topicName = "persistent://public/default/" + 
BrokerTestUtil.newUniqueName("tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topicPolicies().setCompactionThreshold(topicName, 1);
+        admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+        var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        producer.newMessage().key("k0").value("v0").send();
+        producer.newMessage().key("k1").value("v1").send();
+        producer.newMessage().key("k2").value("v2").send();
+        triggerCompactionAndWait(topicName);
+        admin.topics().deleteSubscription(topicName, "s1");
+
+        // Disable compaction.
+        // Inject a failure that the first time to delete cursor will fail.
+        if (isInjectedCursorDeleteError) {
+            AtomicInteger times = new AtomicInteger();
+            String cursorPath = 
String.format("/managed-ledgers/%s/__compaction",
+                    TopicName.get(topicName).getPersistenceNamingEncoding());
+            admin.topicPolicies().removeCompactionThreshold(topicName);
+            mockZooKeeper.failConditional(KeeperException.Code.SESSIONEXPIRED, 
(op, path) -> {
+                return op == MockZooKeeper.Op.DELETE && 
cursorPath.equals(path) && times.incrementAndGet() == 1;
+            });
+            
mockZooKeeperGlobal.failConditional(KeeperException.Code.SESSIONEXPIRED, (op, 
path) -> {
+                return op == MockZooKeeper.Op.DELETE && 
cursorPath.equals(path) && times.incrementAndGet() == 1;
+            });
+            try {
+                admin.topics().deleteSubscription(topicName, "__compaction");
+                fail("Should fail");
+            } catch (Exception ex) {
+                assertTrue(ex instanceof 
PulsarAdminException.ServerSideErrorException);
+            }
+        }
+
+        // Create a reader with start at earliest.
+        // Verify: the reader will receive 3 messages.
+        admin.topics().unload(topicName);
+        Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).readCompacted(true)
+                .startMessageId(MessageId.earliest).create();
+        producer.newMessage().key("k3").value("v3").send();
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m0 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m0.getValue(), "v0");
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m1 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m1.getValue(), "v1");
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m2 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m2.getValue(), "v2");
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m3 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m3.getValue(), "v3");
+
+        // cleanup.
+        producer.close();
+        reader.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @Test
+    public void testDisableCompactionConcurrently() throws Exception {
+        String topicName = "persistent://public/default/" + 
BrokerTestUtil.newUniqueName("tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topicPolicies().setCompactionThreshold(topicName, 1);
+        admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+        var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        producer.newMessage().key("k0").value("v0").send();
+        triggerCompactionAndWait(topicName);
+        admin.topics().deleteSubscription(topicName, "s1");
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        AtomicBoolean disablingCompaction =
+                WhiteboxImpl.getInternalState(persistentTopic, 
"disablingCompaction");

Review Comment:
   Please avoid using reflection/`WhiteboxImpl`. A better approach is to have a 
default access method which is annotation with `@VisibleForTesting`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1340,18 +1341,42 @@ private void 
asyncDeleteCursorWithCleanCompactionLedger(PersistentSubscription s
             return;
         }
 
-        currentCompaction.handle((__, e) -> {
-            if (e != null) {
-                log.warn("[{}][{}] Last compaction task failed", topic, 
subscriptionName);
+        // Avoid concurrently execute compaction and unsubscribing.
+        synchronized (this) {
+            if (!disablingCompaction.compareAndSet(false, true)) {
+                unsubscribeFuture.completeExceptionally(

Review Comment:
   this should be protected with `if (!unsubscribeFuture.isDone()) {` so that 
creating the exception could be avoided.



##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -308,6 +319,145 @@ public void 
testGetLastMessageIdAfterCompactionWithCompression(boolean enabledBa
         admin.topics().delete(topicName, false);
     }
 
+    @DataProvider
+    public Object[][] isInjectedCursorDeleteError() {
+        return new Object[][] {
+                {false},
+                {true}
+        };
+    }
+
+    @Test(dataProvider = "isInjectedCursorDeleteError")
+    public void testReadMsgsAfterDisableCompaction(boolean 
isInjectedCursorDeleteError) throws Exception {
+        String topicName = "persistent://public/default/" + 
BrokerTestUtil.newUniqueName("tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topicPolicies().setCompactionThreshold(topicName, 1);
+        admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+        var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        producer.newMessage().key("k0").value("v0").send();
+        producer.newMessage().key("k1").value("v1").send();
+        producer.newMessage().key("k2").value("v2").send();
+        triggerCompactionAndWait(topicName);
+        admin.topics().deleteSubscription(topicName, "s1");
+
+        // Disable compaction.
+        // Inject a failure that the first time to delete cursor will fail.
+        if (isInjectedCursorDeleteError) {
+            AtomicInteger times = new AtomicInteger();
+            String cursorPath = 
String.format("/managed-ledgers/%s/__compaction",
+                    TopicName.get(topicName).getPersistenceNamingEncoding());
+            admin.topicPolicies().removeCompactionThreshold(topicName);
+            mockZooKeeper.failConditional(KeeperException.Code.SESSIONEXPIRED, 
(op, path) -> {
+                return op == MockZooKeeper.Op.DELETE && 
cursorPath.equals(path) && times.incrementAndGet() == 1;
+            });
+            
mockZooKeeperGlobal.failConditional(KeeperException.Code.SESSIONEXPIRED, (op, 
path) -> {
+                return op == MockZooKeeper.Op.DELETE && 
cursorPath.equals(path) && times.incrementAndGet() == 1;
+            });
+            try {
+                admin.topics().deleteSubscription(topicName, "__compaction");
+                fail("Should fail");
+            } catch (Exception ex) {
+                assertTrue(ex instanceof 
PulsarAdminException.ServerSideErrorException);
+            }
+        }
+
+        // Create a reader with start at earliest.
+        // Verify: the reader will receive 3 messages.
+        admin.topics().unload(topicName);
+        Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).readCompacted(true)
+                .startMessageId(MessageId.earliest).create();
+        producer.newMessage().key("k3").value("v3").send();
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m0 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m0.getValue(), "v0");
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m1 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m1.getValue(), "v1");
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m2 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m2.getValue(), "v2");
+        assertTrue(reader.hasMessageAvailable());
+        Message<String> m3 = reader.readNext(10, TimeUnit.SECONDS);
+        assertEquals(m3.getValue(), "v3");
+
+        // cleanup.
+        producer.close();
+        reader.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @Test
+    public void testDisableCompactionConcurrently() throws Exception {
+        String topicName = "persistent://public/default/" + 
BrokerTestUtil.newUniqueName("tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topicPolicies().setCompactionThreshold(topicName, 1);
+        admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+        var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        producer.newMessage().key("k0").value("v0").send();
+        triggerCompactionAndWait(topicName);
+        admin.topics().deleteSubscription(topicName, "s1");
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        AtomicBoolean disablingCompaction =
+                WhiteboxImpl.getInternalState(persistentTopic, 
"disablingCompaction");
+
+        // Disable compaction.
+        // Inject a delay when the first time of deleting cursor.
+        AtomicInteger times = new AtomicInteger();
+        String cursorPath = String.format("/managed-ledgers/%s/__compaction",
+                TopicName.get(topicName).getPersistenceNamingEncoding());
+        admin.topicPolicies().removeCompactionThreshold(topicName);
+        mockZooKeeper.delay(5000, (op, path) -> {
+            return op == MockZooKeeper.Op.DELETE && cursorPath.equals(path) && 
times.incrementAndGet() == 1;
+        });
+        mockZooKeeperGlobal.delay(5000, (op, path) -> {
+            return op == MockZooKeeper.Op.DELETE && cursorPath.equals(path) && 
times.incrementAndGet() == 1;
+        });
+        AtomicReference<CompletableFuture<Void>> f1 = new 
AtomicReference<CompletableFuture<Void>>();
+        AtomicReference<CompletableFuture<Void>> f2 = new 
AtomicReference<CompletableFuture<Void>>();
+        new Thread(() -> {
+            f1.set(admin.topics().deleteSubscriptionAsync(topicName, 
"__compaction"));
+        }).start();
+        new Thread(() -> {
+            f2.set(admin.topics().deleteSubscriptionAsync(topicName, 
"__compaction"));
+        }).start();
+
+        // Verify: the next compaction will be skipped.
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(disablingCompaction.get());
+        });
+        producer.newMessage().key("k1").value("v1").send();
+        producer.newMessage().key("k2").value("v2").send();
+        CompletableFuture<Long> currentCompaction1 =
+                WhiteboxImpl.getInternalState(persistentTopic, 
"currentCompaction");
+        persistentTopic.triggerCompaction();
+        CompletableFuture<Long> currentCompaction2 =
+                WhiteboxImpl.getInternalState(persistentTopic, 
"currentCompaction");

Review Comment:
   Again, avoid reflection/`WhiteboxImpl`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to