This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit be22a2419edf82cb275f5246c0e8a62f753f8c9e Author: lipenghui <peng...@apache.org> AuthorDate: Tue Jun 22 18:11:06 2021 +0800 Fix potential data lost on the system topic when topic compaction have not triggered yet (#11003) To pre-create the subscription for the compactor to avoid lost any data since we are using reader for reading data from the __change_events topic, if no durable subscription on the topic, the data might be lost. Since we are using the topic compaction on the __change_events topic to reduce the topic policy cache recovery time, so we can leverage the topic compaction cursor for retaining the data. (cherry picked from commit 94ec03111369e694f432ca219be77820648d2188) --- .../pulsar/broker/service/BrokerService.java | 11 ++++- .../broker/service/persistent/PersistentTopic.java | 14 ++++-- .../broker/service/persistent/SystemTopic.java | 22 ++++++++- .../pulsar/broker/admin/TopicPoliciesTest.java | 14 +++++- .../SystemTopicBasedTopicPoliciesServiceTest.java | 54 ++++++++++++---------- 5 files changed, 81 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9376d85..4f41346 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1108,8 +1108,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies PersistentTopic persistentTopic = isSystemTopic(topic) ? new SystemTopic(topic, ledger, BrokerService.this) : new PersistentTopic(topic, ledger, BrokerService.this); + CompletableFuture<Void> preCreateSubForCompaction = + CompletableFuture.completedFuture(null); + if (persistentTopic instanceof SystemTopic) { + preCreateSubForCompaction = ((SystemTopic) persistentTopic) + .preCreateSubForCompactionIfNeeded(); + } CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication(); - replicationFuture.thenCompose(v -> { + FutureUtil.waitForAll(Lists.newArrayList(preCreateSubForCompaction, replicationFuture)) + .thenCompose(v -> { // Also check dedup status return persistentTopic.checkDeduplicationStatus(); }).thenRun(() -> { @@ -1139,7 +1146,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies return null; }); - } catch (NamingException e) { + } catch (NamingException | PulsarServerException e) { log.warn("Failed to create topic {}-{}", topic, e.getMessage()); pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 45de988..8738da1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; - +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import com.carrotsearch.hppc.ObjectObjectHashMap; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -136,7 +136,6 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicImpl; -import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.utils.StatsOutputStream; import org.apache.zookeeper.KeeperException; @@ -334,7 +333,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, boolean replicated) { checkNotNull(compactedTopic); - if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) { + if (subscriptionName.equals(COMPACTION_SUBSCRIPTION)) { return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor); } else { return new PersistentSubscription(this, subscriptionName, cursor, replicated); @@ -1248,7 +1247,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal long backlogEstimate = 0; - PersistentSubscription compactionSub = subscriptions.get(Compactor.COMPACTION_SUBSCRIPTION); + PersistentSubscription compactionSub = subscriptions.get(COMPACTION_SUBSCRIPTION); if (compactionSub != null) { backlogEstimate = compactionSub.estimateBacklogSize(); } else { @@ -1275,6 +1274,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } } + /** + * Return if the topic has triggered compaction before or not. + */ + protected boolean hasCompactionTriggered() { + return subscriptions.containsKey(COMPACTION_SUBSCRIPTION); + } + CompletableFuture<Void> startReplicator(String remoteCluster) { log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster); final CompletableFuture<Void> future = new CompletableFuture<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 0b714c6..536b6bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -19,13 +19,18 @@ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe; public class SystemTopic extends PersistentTopic { - public SystemTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws BrokerServiceException.NamingException { + public SystemTopic(String topic, ManagedLedger ledger, BrokerService brokerService) + throws BrokerServiceException.NamingException, PulsarServerException { super(topic, ledger, brokerService); } @@ -48,4 +53,19 @@ public class SystemTopic extends PersistentTopic { public void checkGC() { // do nothing for system topic } + + public CompletableFuture<Void> preCreateSubForCompactionIfNeeded() { + if (!super.hasCompactionTriggered()) { + // To pre-create the subscription for the compactor to avoid lost any data since we are using reader + // for reading data from the __change_events topic, if no durable subscription on the topic, + // the data might be lost. Since we are using the topic compaction on the __change_events topic + // to reduce the topic policy cache recovery time, + // so we can leverage the topic compaction cursor for retaining the data. + return super.createSubscription(COMPACTION_SUBSCRIPTION, + CommandSubscribe.InitialPosition.Earliest, false) + .thenCompose(__ -> CompletableFuture.completedFuture(null)); + } else { + return CompletableFuture.completedFuture(null); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 6610bf1..b9b7b69 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -44,6 +44,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; @@ -1523,6 +1524,15 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { log.info("Backlog quota set success on topic: {}", testTopic); Awaitility.await() + .untilAsserted(() -> { + TopicStats stats = admin.topics().getStats(topicPolicyEventsTopic); + Assert.assertTrue(stats.subscriptions.containsKey("__compaction")); + }); + + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicPolicyEventsTopic); + long previousCompactedLedgerId = internalStats.compactedLedger.ledgerId; + + Awaitility.await() .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); @@ -1530,8 +1540,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { Awaitility.await() .untilAsserted(() -> { - TopicStats stats = admin.topics().getStats(topicPolicyEventsTopic); - Assert.assertTrue(stats.subscriptions.containsKey("__compaction")); + PersistentTopicInternalStats iStats = admin.topics().getInternalStats(topicPolicyEventsTopic); + Assert.assertTrue(iStats.compactedLedger.ledgerId != previousCompactedLedgerId); }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 0de5543..9eb8669 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -76,11 +77,14 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get(); // Wait for all topic policies updated. - Thread.sleep(3000); + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(systemTopicBasedTopicPoliciesService + .getPoliciesCacheInit(TOPIC1.getNamespaceObject()))); - Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject())); // Assert broker is cache all topic policies - Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue(), 10); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1) + .getMaxConsumerPerTopic().intValue(), 10)); // Update policy for TOPIC1 TopicPolicies policies1 = TopicPolicies.builder() @@ -118,21 +122,21 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic .build(); systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, policies6).get(); - Thread.sleep(1000); - - TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); - TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); - TopicPolicies policiesGet3 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3); - TopicPolicies policiesGet4 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4); - TopicPolicies policiesGet5 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5); - TopicPolicies policiesGet6 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6); - - Assert.assertEquals(policiesGet1, policies1); - Assert.assertEquals(policiesGet2, policies2); - Assert.assertEquals(policiesGet3, policies3); - Assert.assertEquals(policiesGet4, policies4); - Assert.assertEquals(policiesGet5, policies5); - Assert.assertEquals(policiesGet6, policies6); + Awaitility.await().untilAsserted(() -> { + TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); + TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); + TopicPolicies policiesGet3 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3); + TopicPolicies policiesGet4 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4); + TopicPolicies policiesGet5 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5); + TopicPolicies policiesGet6 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6); + + Assert.assertEquals(policiesGet1, policies1); + Assert.assertEquals(policiesGet2, policies2); + Assert.assertEquals(policiesGet3, policies3); + Assert.assertEquals(policiesGet4, policies4); + Assert.assertEquals(policiesGet5, policies5); + Assert.assertEquals(policiesGet6, policies6); + }); // Remove reader cache will remove policies cache Assert.assertEquals(systemTopicBasedTopicPoliciesService.getPoliciesCacheSize(), 6); @@ -155,13 +159,13 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic policies1.setMaxProducerPerTopic(106); systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); - Thread.sleep(1000); - // reader for NAMESPACE1 will back fill the reader cache - policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); - policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); - Assert.assertEquals(policies1, policiesGet1); - Assert.assertEquals(policies2, policiesGet2); + Awaitility.await().untilAsserted(() -> { + TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); + TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); + Assert.assertEquals(policies1, policiesGet1); + Assert.assertEquals(policies2, policiesGet2); + }); // Check reader cache is correct. Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2))); @@ -169,7 +173,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3))); // Check get without cache - policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get(); + TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get(); Assert.assertEquals(policies1, policiesGet1); }