poorbarcode commented on code in PR #23980:
URL: https://github.com/apache/pulsar/pull/23980#discussion_r1955483706
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -245,6 +245,7 @@ public static boolean isDedupCursorName(String name) {
private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
private volatile CompletableFuture<Long> currentCompaction =
CompletableFuture.completedFuture(
COMPACTION_NEVER_RUN);
+ private volatile AtomicBoolean disablingCompaction = new
AtomicBoolean(false);
Review Comment:
Good point, I have changed the modifier
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -308,6 +319,145 @@ public void
testGetLastMessageIdAfterCompactionWithCompression(boolean enabledBa
admin.topics().delete(topicName, false);
}
+ @DataProvider
+ public Object[][] isInjectedCursorDeleteError() {
+ return new Object[][] {
+ {false},
+ {true}
+ };
+ }
+
+ @Test(dataProvider = "isInjectedCursorDeleteError")
+ public void testReadMsgsAfterDisableCompaction(boolean
isInjectedCursorDeleteError) throws Exception {
+ String topicName = "persistent://public/default/" +
BrokerTestUtil.newUniqueName("tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topicPolicies().setCompactionThreshold(topicName, 1);
+ admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+ var producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ producer.newMessage().key("k0").value("v0").send();
+ producer.newMessage().key("k1").value("v1").send();
+ producer.newMessage().key("k2").value("v2").send();
+ triggerCompactionAndWait(topicName);
+ admin.topics().deleteSubscription(topicName, "s1");
+
+ // Disable compaction.
+ // Inject a failure that the first time to delete cursor will fail.
+ if (isInjectedCursorDeleteError) {
+ AtomicInteger times = new AtomicInteger();
+ String cursorPath =
String.format("/managed-ledgers/%s/__compaction",
+ TopicName.get(topicName).getPersistenceNamingEncoding());
+ admin.topicPolicies().removeCompactionThreshold(topicName);
+ mockZooKeeper.failConditional(KeeperException.Code.SESSIONEXPIRED,
(op, path) -> {
+ return op == MockZooKeeper.Op.DELETE &&
cursorPath.equals(path) && times.incrementAndGet() == 1;
+ });
+
mockZooKeeperGlobal.failConditional(KeeperException.Code.SESSIONEXPIRED, (op,
path) -> {
+ return op == MockZooKeeper.Op.DELETE &&
cursorPath.equals(path) && times.incrementAndGet() == 1;
+ });
+ try {
+ admin.topics().deleteSubscription(topicName, "__compaction");
+ fail("Should fail");
+ } catch (Exception ex) {
+ assertTrue(ex instanceof
PulsarAdminException.ServerSideErrorException);
+ }
+ }
+
+ // Create a reader with start at earliest.
+ // Verify: the reader will receive 3 messages.
+ admin.topics().unload(topicName);
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topicName).readCompacted(true)
+ .startMessageId(MessageId.earliest).create();
+ producer.newMessage().key("k3").value("v3").send();
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m0 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m0.getValue(), "v0");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m1 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m1.getValue(), "v1");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m2 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m2.getValue(), "v2");
+ assertTrue(reader.hasMessageAvailable());
+ Message<String> m3 = reader.readNext(10, TimeUnit.SECONDS);
+ assertEquals(m3.getValue(), "v3");
+
+ // cleanup.
+ producer.close();
+ reader.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ @Test
+ public void testDisableCompactionConcurrently() throws Exception {
+ String topicName = "persistent://public/default/" +
BrokerTestUtil.newUniqueName("tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topicPolicies().setCompactionThreshold(topicName, 1);
+ admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
+ var producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ producer.newMessage().key("k0").value("v0").send();
+ triggerCompactionAndWait(topicName);
+ admin.topics().deleteSubscription(topicName, "s1");
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).get().get();
+ AtomicBoolean disablingCompaction =
+ WhiteboxImpl.getInternalState(persistentTopic,
"disablingCompaction");
Review Comment:
Changed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]