This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 291ec28  [bot-cherry-pick][broker] Close topics that remain fenced 
forcefully (#8636)
291ec28 is described below

commit 291ec2836c3f5c20abeb722f3414e1720bafac7f
Author: lipenghui <peng...@apache.org>
AuthorDate: Fri Nov 20 11:59:42 2020 +0800

    [bot-cherry-pick][broker] Close topics that remain fenced forcefully (#8636)
    
    * [bot-cherry-pick][broker] Close topics that remain fenced forcefully
    
    * Fix conflicts.
    
    Co-authored-by: Masahiro Sakamoto <massa...@yahoo-corp.jp>
---
 conf/broker.conf                                   |  4 +++
 conf/standalone.conf                               |  4 +++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++++
 .../broker/service/persistent/PersistentTopic.java | 41 ++++++++++++++++++++--
 .../pulsar/broker/service/PersistentTopicTest.java | 38 ++++++++++++++++++++
 5 files changed, 91 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2709a83..444180e 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -410,6 +410,10 @@ systemTopicEnabled=false
 # Please enable the system topic first.
 topicLevelPoliciesEnabled=false
 
+# If a topic remains fenced for this number of seconds, it will be closed 
forcefully.
+# If it is set to 0 or a negative number, the fenced topic will not be closed.
+topicFencingTimeoutSeconds=0
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request 
with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 0ce88dc..7ca742a 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -350,6 +350,10 @@ systemTopicEnabled=false
 # Please enable the system topic first.
 topicLevelPoliciesEnabled=false
 
+# If a topic remains fenced for this number of seconds, it will be closed 
forcefully.
+# If it is set to 0 or a negative number, the fenced topic will not be closed.
+topicFencingTimeoutSeconds=0
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request 
with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4da87d8..7d52f78 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -755,6 +755,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private String zookeeperSessionExpiredPolicy = "shutdown";
 
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "If a topic remains fenced for this number of seconds, it will 
be closed forcefully.\n"
+                + " If it is set to 0 or a negative number, the fenced topic 
will not be closed."
+    )
+    private int topicFencingTimeoutSeconds = 0;
 
     /**** --- Messaging Protocols --- ****/
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f467732..198195d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -184,6 +185,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public volatile int maxUnackedMessagesOnSubscription = -1;
     private volatile boolean isClosingOrDeleting = false;
 
+    private ScheduledFuture<?> fencedTopicMonitoringTask = null;
+
     private static class TopicStatsHelper {
         public double averageMsgSize;
         public double aggMsgRateIn;
@@ -353,7 +356,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     // signal to managed ledger that we are ready to resume by 
creating a new ledger
                     ledger.readyToCreateNewLedger();
 
-                    isFenced = false;
+                    unfence();
                 }
 
             }
@@ -380,7 +383,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         } else {
 
             // fence topic when failed to write a message to BK
-            isFenced = true;
+            fence();
             // close all producers
             List<CompletableFuture<Void>> futures = Lists.newArrayList();
             producers.values().forEach(producer -> 
futures.add(producer.disconnect()));
@@ -2206,6 +2209,40 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return false;
     }
 
+    private synchronized void fence() {
+        isFenced = true;
+        ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
+        if (monitoringTask == null || monitoringTask.isDone()) {
+            final int timeout = 
brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds();
+            if (timeout > 0) {
+                this.fencedTopicMonitoringTask = 
brokerService.executor().schedule(this::closeFencedTopicForcefully,
+                        timeout, TimeUnit.SECONDS);
+            }
+        }
+    }
+
+    private synchronized void unfence() {
+        isFenced = false;
+        ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
+        if (monitoringTask != null && !monitoringTask.isDone()) {
+            monitoringTask.cancel(false);
+        }
+    }
+
+    private void closeFencedTopicForcefully() {
+        if (isFenced) {
+            final int timeout = 
brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds();
+            if (isClosingOrDeleting) {
+                log.warn("[{}] Topic remained fenced for {} seconds and is 
already closed (pendingWriteOps: {})", topic,
+                        timeout, pendingWriteOps.get());
+            } else {
+                log.error("[{}] Topic remained fenced for {} seconds, so close 
it (pendingWriteOps: {})", topic,
+                        timeout, pendingWriteOps.get());
+                close();
+            }
+        }
+    }
+
     private void fenceTopicToCloseOrDelete() {
         isClosingOrDeleting = true;
         isFenced = true;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 970277d..bd56f35 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -59,6 +59,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1747,6 +1748,43 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         verify(nonDeletableSubscription2, times(0)).delete();
     }
 
+    @Test
+    public void testTopicFencingTimeout() throws Exception {
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+
+        Method fence = PersistentTopic.class.getDeclaredMethod("fence");
+        fence.setAccessible(true);
+        Method unfence = PersistentTopic.class.getDeclaredMethod("unfence");
+        unfence.setAccessible(true);
+
+        Field fencedTopicMonitoringTaskField = 
PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
+        fencedTopicMonitoringTaskField.setAccessible(true);
+        Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
+        isFencedField.setAccessible(true);
+        Field isClosingOrDeletingField = 
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
+        isClosingOrDeletingField.setAccessible(true);
+
+        doReturn(10).when(svcConfig).getTopicFencingTimeoutSeconds();
+        fence.invoke(topic);
+        unfence.invoke(topic);
+        ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) 
fencedTopicMonitoringTaskField.get(topic);
+        assertTrue(fencedTopicMonitoringTask.isDone());
+        assertTrue(fencedTopicMonitoringTask.isCancelled());
+        assertFalse((boolean) isFencedField.get(topic));
+        assertFalse((boolean) isClosingOrDeletingField.get(topic));
+
+        doReturn(1).when(svcConfig).getTopicFencingTimeoutSeconds();
+        fence.invoke(topic);
+        Thread.sleep(2000);
+        fencedTopicMonitoringTask = (ScheduledFuture<?>) 
fencedTopicMonitoringTaskField.get(topic);
+        assertTrue(fencedTopicMonitoringTask.isDone());
+        assertFalse(fencedTopicMonitoringTask.isCancelled());
+        assertTrue((boolean) isFencedField.get(topic));
+        assertTrue((boolean) isClosingOrDeletingField.get(topic));
+    }
+
     private ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
         MessageMetadata messageData = 
MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis())
             .setProducerName("prod-name").setSequenceId(0).build();

Reply via email to