This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 14824a5adbd [improve][broker]consumer backlog eviction policy should
not reset read position for consumer (#18350)
14824a5adbd is described below
commit 14824a5adbd052f6293763a05f5f84021dfcf234
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Nov 7 10:26:11 2022 +0800
[improve][broker]consumer backlog eviction policy should not reset read
position for consumer (#18350)
---
.../pulsar/broker/service/BacklogQuotaManager.java | 16 +++++---
.../broker/service/BacklogQuotaManagerTest.java | 47 +++++++++++++++-------
2 files changed, 43 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index ee12c3ff743..607e7387fb3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -239,22 +239,28 @@ public class BacklogQuotaManager {
Long currentMillis = ((ManagedLedgerImpl)
persistentTopic.getManagedLedger()).getClock().millis();
ManagedLedgerImpl mLedger = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
try {
- for (;;) {
+ for (; ; ) {
ManagedCursor slowestConsumer =
mLedger.getSlowestConsumer();
Position oldestPosition =
slowestConsumer.getMarkDeletedPosition();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] slowest consumer mark delete position
is [{}], read position is [{}]",
+ slowestConsumer.getName(), oldestPosition,
slowestConsumer.getReadPosition());
+ }
ManagedLedgerInfo.LedgerInfo ledgerInfo =
mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get();
if (ledgerInfo == null) {
-
slowestConsumer.resetCursor(mLedger.getNextValidPosition((PositionImpl)
oldestPosition));
+ PositionImpl nextPosition =
+
PositionImpl.get(mLedger.getNextValidLedger(oldestPosition.getLedgerId()), -1);
+ slowestConsumer.markDelete(nextPosition);
continue;
}
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() >
quota.getLimitTime()) {
// skip whole ledger for the slowest cursor
- PositionImpl nextPosition =
mLedger.getNextValidPosition(
- PositionImpl.get(ledgerInfo.getLedgerId(),
ledgerInfo.getEntries() - 1));
+ PositionImpl nextPosition =
+
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
if (!nextPosition.equals(oldestPosition)) {
- slowestConsumer.resetCursor(nextPosition);
+ slowestConsumer.markDelete(nextPosition);
continue;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 21fcffac1ea..781a90a1c10 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -35,8 +35,11 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -47,7 +50,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -148,7 +150,7 @@ public class BacklogQuotaManagerTest {
}
/**
- * Readers should not effect backlog quota
+ * Readers should not effect backlog quota.
*/
@Test
public void testBacklogQuotaWithReader() throws Exception {
@@ -160,11 +162,13 @@ public class BacklogQuotaManagerTest {
.limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build());
- try (PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0,
TimeUnit.SECONDS).build();) {
+ try (PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();) {
final String topic1 = "persistent://prop/ns-quota/topic1";
final int numMsgs = 20;
- Reader<byte[]> reader =
client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
+ Reader<byte[]> reader =
client.newReader().topic(topic1).receiverQueueSize(1)
+ .startMessageId(MessageId.latest).create();
org.apache.pulsar.client.api.Producer<byte[]> producer =
createProducer(client, topic1);
@@ -187,7 +191,7 @@ public class BacklogQuotaManagerTest {
assertEquals(stats.getSubscriptions().size(), 1);
long nonDurableSubscriptionBacklog =
stats.getSubscriptions().values().iterator().next().getMsgBacklog();
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
- "non-durable subscription backlog is [" +
nonDurableSubscriptionBacklog + "]"); ;
+ "non-durable subscription backlog is [" +
nonDurableSubscriptionBacklog + "]");
try {
// try to send over backlog quota and make sure it fails
@@ -237,10 +241,12 @@ public class BacklogQuotaManagerTest {
.limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build());
- try (PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0,
TimeUnit.SECONDS).build();) {
+ try (PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();) {
final String topic1 = "persistent://prop/ns-quota/topic1" +
UUID.randomUUID();
final int numMsgs = 20;
- Reader<byte[]> reader =
client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
+ Reader<byte[]> reader =
client.newReader().topic(topic1).receiverQueueSize(1)
+ .startMessageId(MessageId.latest).create();
Producer<byte[]> producer = createProducer(client, topic1);
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
@@ -257,7 +263,7 @@ public class BacklogQuotaManagerTest {
assertEquals(stats.getSubscriptions().size(), 1);
long nonDurableSubscriptionBacklog =
stats.getSubscriptions().values().iterator().next().getMsgBacklog();
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
- "non-durable subscription backlog is [" +
nonDurableSubscriptionBacklog + "]"); ;
+ "non-durable subscription backlog is [" +
nonDurableSubscriptionBacklog + "]");
try {
// try to send over backlog quota and make sure it fails
for (int i = 0; i < numMsgs; i++) {
@@ -307,10 +313,12 @@ public class BacklogQuotaManagerTest {
.limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build());
- try (PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0,
TimeUnit.SECONDS).build();) {
+ try (PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString())
+ .statsInterval(0, TimeUnit.SECONDS).build();) {
final String topic1 = "persistent://prop/ns-quota/topic2" +
UUID.randomUUID();
final int numMsgs = 9;
- Reader<byte[]> reader =
client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
+ Reader<byte[]> reader =
client.newReader().topic(topic1).receiverQueueSize(1)
+ .startMessageId(MessageId.latest).create();
Producer<byte[]> producer = createProducer(client, topic1);
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
@@ -472,14 +480,22 @@ public class BacklogQuotaManagerTest {
assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(),
14);
assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(),
14);
+ PersistentTopic topic1Reference = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic1).get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
topic1Reference.getManagedLedger();
+ Position slowConsumerReadPos =
ml.getSlowestConsumer().getReadPosition();
+
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
rolloverStats();
- stats = admin.topics().getStats(topic1);
+ TopicStats stats2 = admin.topics().getStats(topic1);
// Messages on first 2 ledgers should be expired, backlog is number of
- // message in current ledger which should be 4.
- assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(),
4);
- assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(),
4);
+ // message in current ledger.
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(),
ml.getCurrentLedgerEntries());
+
assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(),
ml.getCurrentLedgerEntries());
+ });
+
+ assertEquals(ml.getSlowestConsumer().getReadPosition(),
slowConsumerReadPos);
client.close();
}
@@ -1284,7 +1300,8 @@ public class BacklogQuotaManagerTest {
pulsar.start();
@Cleanup
- PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).statsInterval(0,
TimeUnit.SECONDS)
+ PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
+ .statsInterval(0, TimeUnit.SECONDS)
.build();
final String topic1 = "persistent://prop/ns-quota/topic2";