This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit be22a2419edf82cb275f5246c0e8a62f753f8c9e
Author: lipenghui <peng...@apache.org>
AuthorDate: Tue Jun 22 18:11:06 2021 +0800

    Fix potential data lost on the system topic when topic compaction have not 
triggered yet (#11003)
    
    To pre-create the subscription for the compactor to avoid lost any data 
since we are using reader
    for reading data from the __change_events topic, if no durable subscription 
on the topic,
    the data might be lost. Since we are using the topic compaction on the 
__change_events topic
    to reduce the topic policy cache recovery time,
    so we can leverage the topic compaction cursor for retaining the data.
    
    (cherry picked from commit 94ec03111369e694f432ca219be77820648d2188)
---
 .../pulsar/broker/service/BrokerService.java       | 11 ++++-
 .../broker/service/persistent/PersistentTopic.java | 14 ++++--
 .../broker/service/persistent/SystemTopic.java     | 22 ++++++++-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 14 +++++-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 54 ++++++++++++----------
 5 files changed, 81 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9376d85..4f41346 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1108,8 +1108,15 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                                 PersistentTopic persistentTopic = 
isSystemTopic(topic)
                                         ? new SystemTopic(topic, ledger, 
BrokerService.this)
                                         : new PersistentTopic(topic, ledger, 
BrokerService.this);
+                                CompletableFuture<Void> 
preCreateSubForCompaction =
+                                        
CompletableFuture.completedFuture(null);
+                                if (persistentTopic instanceof SystemTopic) {
+                                    preCreateSubForCompaction = ((SystemTopic) 
persistentTopic)
+                                            
.preCreateSubForCompactionIfNeeded();
+                                }
                                 CompletableFuture<Void> replicationFuture = 
persistentTopic.checkReplication();
-                                replicationFuture.thenCompose(v -> {
+                                
FutureUtil.waitForAll(Lists.newArrayList(preCreateSubForCompaction, 
replicationFuture))
+                                .thenCompose(v -> {
                                     // Also check dedup status
                                     return 
persistentTopic.checkDeduplicationStatus();
                                 }).thenRun(() -> {
@@ -1139,7 +1146,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
                                     return null;
                                 });
-                            } catch (NamingException e) {
+                            } catch (NamingException | PulsarServerException 
e) {
                                 log.warn("Failed to create topic {}-{}", 
topic, e.getMessage());
                                 pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
                                 topicFuture.completeExceptionally(e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 45de988..8738da1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -22,7 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.carrotsearch.hppc.ObjectObjectHashMap;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -136,7 +136,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
-import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
 import org.apache.zookeeper.KeeperException;
@@ -334,7 +333,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private PersistentSubscription createPersistentSubscription(String 
subscriptionName, ManagedCursor cursor,
             boolean replicated) {
         checkNotNull(compactedTopic);
-        if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) {
+        if (subscriptionName.equals(COMPACTION_SUBSCRIPTION)) {
             return new CompactorSubscription(this, compactedTopic, 
subscriptionName, cursor);
         } else {
             return new PersistentSubscription(this, subscriptionName, cursor, 
replicated);
@@ -1248,7 +1247,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
                 long backlogEstimate = 0;
 
-                PersistentSubscription compactionSub = 
subscriptions.get(Compactor.COMPACTION_SUBSCRIPTION);
+                PersistentSubscription compactionSub = 
subscriptions.get(COMPACTION_SUBSCRIPTION);
                 if (compactionSub != null) {
                     backlogEstimate = compactionSub.estimateBacklogSize();
                 } else {
@@ -1275,6 +1274,13 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
     }
 
+    /**
+     * Return if the topic has triggered compaction before or not.
+     */
+    protected boolean hasCompactionTriggered() {
+        return subscriptions.containsKey(COMPACTION_SUBSCRIPTION);
+    }
+
     CompletableFuture<Void> startReplicator(String remoteCluster) {
         log.info("[{}] Starting replicator to remote: {}", topic, 
remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 0b714c6..536b6bb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -19,13 +19,18 @@
 
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
 
 public class SystemTopic extends PersistentTopic {
 
-    public SystemTopic(String topic, ManagedLedger ledger, BrokerService 
brokerService) throws BrokerServiceException.NamingException {
+    public SystemTopic(String topic, ManagedLedger ledger, BrokerService 
brokerService)
+            throws BrokerServiceException.NamingException, 
PulsarServerException {
         super(topic, ledger, brokerService);
     }
 
@@ -48,4 +53,19 @@ public class SystemTopic extends PersistentTopic {
     public void checkGC() {
         // do nothing for system topic
     }
+
+    public CompletableFuture<Void> preCreateSubForCompactionIfNeeded() {
+        if (!super.hasCompactionTriggered()) {
+            // To pre-create the subscription for the compactor to avoid lost 
any data since we are using reader
+            // for reading data from the __change_events topic, if no durable 
subscription on the topic,
+            // the data might be lost. Since we are using the topic compaction 
on the __change_events topic
+            // to reduce the topic policy cache recovery time,
+            // so we can leverage the topic compaction cursor for retaining 
the data.
+            return super.createSubscription(COMPACTION_SUBSCRIPTION,
+                    CommandSubscribe.InitialPosition.Earliest, false)
+                    .thenCompose(__ -> 
CompletableFuture.completedFuture(null));
+        } else {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 6610bf1..b9b7b69 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -1523,6 +1524,15 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         log.info("Backlog quota set success on topic: {}", testTopic);
 
         Awaitility.await()
+                .untilAsserted(() -> {
+                    TopicStats stats = 
admin.topics().getStats(topicPolicyEventsTopic);
+                    
Assert.assertTrue(stats.subscriptions.containsKey("__compaction"));
+                });
+
+        PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topicPolicyEventsTopic);
+        long previousCompactedLedgerId = 
internalStats.compactedLedger.ledgerId;
+
+        Awaitility.await()
                 .untilAsserted(() -> 
Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                         
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
 
@@ -1530,8 +1540,8 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
         Awaitility.await()
                 .untilAsserted(() -> {
-                    TopicStats stats = 
admin.topics().getStats(topicPolicyEventsTopic);
-                    
Assert.assertTrue(stats.subscriptions.containsKey("__compaction"));
+                    PersistentTopicInternalStats iStats = 
admin.topics().getInternalStats(topicPolicyEventsTopic);
+                    Assert.assertTrue(iStats.compactedLedger.ledgerId != 
previousCompactedLedgerId);
                 });
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 0de5543..9eb8669 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -76,11 +77,14 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, 
initPolicy).get();
 
         // Wait for all topic policies updated.
-        Thread.sleep(3000);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(systemTopicBasedTopicPoliciesService
+                        .getPoliciesCacheInit(TOPIC1.getNamespaceObject())));
 
-        
Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));
         // Assert broker is cache all topic policies
-        
Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue(),
 10);
+        Awaitility.await().untilAsserted(() ->
+                
Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1)
+                        .getMaxConsumerPerTopic().intValue(), 10));
 
         // Update policy for TOPIC1
         TopicPolicies policies1 = TopicPolicies.builder()
@@ -118,21 +122,21 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
                 .build();
         systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, 
policies6).get();
 
-        Thread.sleep(1000);
-
-        TopicPolicies policiesGet1 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
-        TopicPolicies policiesGet2 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
-        TopicPolicies policiesGet3 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3);
-        TopicPolicies policiesGet4 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4);
-        TopicPolicies policiesGet5 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5);
-        TopicPolicies policiesGet6 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6);
-
-        Assert.assertEquals(policiesGet1, policies1);
-        Assert.assertEquals(policiesGet2, policies2);
-        Assert.assertEquals(policiesGet3, policies3);
-        Assert.assertEquals(policiesGet4, policies4);
-        Assert.assertEquals(policiesGet5, policies5);
-        Assert.assertEquals(policiesGet6, policies6);
+        Awaitility.await().untilAsserted(() -> {
+            TopicPolicies policiesGet1 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
+            TopicPolicies policiesGet2 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
+            TopicPolicies policiesGet3 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3);
+            TopicPolicies policiesGet4 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4);
+            TopicPolicies policiesGet5 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5);
+            TopicPolicies policiesGet6 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6);
+
+            Assert.assertEquals(policiesGet1, policies1);
+            Assert.assertEquals(policiesGet2, policies2);
+            Assert.assertEquals(policiesGet3, policies3);
+            Assert.assertEquals(policiesGet4, policies4);
+            Assert.assertEquals(policiesGet5, policies5);
+            Assert.assertEquals(policiesGet6, policies6);
+        });
 
         // Remove reader cache will remove policies cache
         
Assert.assertEquals(systemTopicBasedTopicPoliciesService.getPoliciesCacheSize(),
 6);
@@ -155,13 +159,13 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         policies1.setMaxProducerPerTopic(106);
         systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, 
policies1);
 
-        Thread.sleep(1000);
-
         // reader for NAMESPACE1 will back fill the reader cache
-        policiesGet1 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
-        policiesGet2 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
-        Assert.assertEquals(policies1, policiesGet1);
-        Assert.assertEquals(policies2, policiesGet2);
+        Awaitility.await().untilAsserted(() -> {
+            TopicPolicies policiesGet1 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
+            TopicPolicies policiesGet2 = 
systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
+            Assert.assertEquals(policies1, policiesGet1);
+            Assert.assertEquals(policies2, policiesGet2);
+        });
 
         // Check reader cache is correct.
         
Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2)));
@@ -169,7 +173,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         
Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3)));
 
         // Check get without cache
-        policiesGet1 = 
systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get();
+        TopicPolicies policiesGet1 = 
systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get();
         Assert.assertEquals(policies1, policiesGet1);
     }
 

Reply via email to