This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push: new 291ec28 [bot-cherry-pick][broker] Close topics that remain fenced forcefully (#8636) 291ec28 is described below commit 291ec2836c3f5c20abeb722f3414e1720bafac7f Author: lipenghui <peng...@apache.org> AuthorDate: Fri Nov 20 11:59:42 2020 +0800 [bot-cherry-pick][broker] Close topics that remain fenced forcefully (#8636) * [bot-cherry-pick][broker] Close topics that remain fenced forcefully * Fix conflicts. Co-authored-by: Masahiro Sakamoto <massa...@yahoo-corp.jp> --- conf/broker.conf | 4 +++ conf/standalone.conf | 4 +++ .../apache/pulsar/broker/ServiceConfiguration.java | 6 ++++ .../broker/service/persistent/PersistentTopic.java | 41 ++++++++++++++++++++-- .../pulsar/broker/service/PersistentTopicTest.java | 38 ++++++++++++++++++++ 5 files changed, 91 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 2709a83..444180e 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -410,6 +410,10 @@ systemTopicEnabled=false # Please enable the system topic first. topicLevelPoliciesEnabled=false +# If a topic remains fenced for this number of seconds, it will be closed forcefully. +# If it is set to 0 or a negative number, the fenced topic will not be closed. +topicFencingTimeoutSeconds=0 + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/conf/standalone.conf b/conf/standalone.conf index 0ce88dc..7ca742a 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -350,6 +350,10 @@ systemTopicEnabled=false # Please enable the system topic first. topicLevelPoliciesEnabled=false +# If a topic remains fenced for this number of seconds, it will be closed forcefully. +# If it is set to 0 or a negative number, the fenced topic will not be closed. +topicFencingTimeoutSeconds=0 + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 4da87d8..7d52f78 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -755,6 +755,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String zookeeperSessionExpiredPolicy = "shutdown"; + @FieldContext( + category = CATEGORY_SERVER, + doc = "If a topic remains fenced for this number of seconds, it will be closed forcefully.\n" + + " If it is set to 0 or a negative number, the fenced topic will not be closed." + ) + private int topicFencingTimeoutSeconds = 0; /**** --- Messaging Protocols --- ****/ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f467732..198195d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -184,6 +185,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public volatile int maxUnackedMessagesOnSubscription = -1; private volatile boolean isClosingOrDeleting = false; + private ScheduledFuture<?> fencedTopicMonitoringTask = null; + private static class TopicStatsHelper { public double averageMsgSize; public double aggMsgRateIn; @@ -353,7 +356,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal // signal to managed ledger that we are ready to resume by creating a new ledger ledger.readyToCreateNewLedger(); - isFenced = false; + unfence(); } } @@ -380,7 +383,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { // fence topic when failed to write a message to BK - isFenced = true; + fence(); // close all producers List<CompletableFuture<Void>> futures = Lists.newArrayList(); producers.values().forEach(producer -> futures.add(producer.disconnect())); @@ -2206,6 +2209,40 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return false; } + private synchronized void fence() { + isFenced = true; + ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask; + if (monitoringTask == null || monitoringTask.isDone()) { + final int timeout = brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds(); + if (timeout > 0) { + this.fencedTopicMonitoringTask = brokerService.executor().schedule(this::closeFencedTopicForcefully, + timeout, TimeUnit.SECONDS); + } + } + } + + private synchronized void unfence() { + isFenced = false; + ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask; + if (monitoringTask != null && !monitoringTask.isDone()) { + monitoringTask.cancel(false); + } + } + + private void closeFencedTopicForcefully() { + if (isFenced) { + final int timeout = brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds(); + if (isClosingOrDeleting) { + log.warn("[{}] Topic remained fenced for {} seconds and is already closed (pendingWriteOps: {})", topic, + timeout, pendingWriteOps.get()); + } else { + log.error("[{}] Topic remained fenced for {} seconds, so close it (pendingWriteOps: {})", topic, + timeout, pendingWriteOps.get()); + close(); + } + } + } + private void fenceTopicToCloseOrDelete() { isClosingOrDeleting = true; isFenced = true; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 970277d..bd56f35 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -59,6 +59,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1747,6 +1748,43 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { verify(nonDeletableSubscription2, times(0)).delete(); } + @Test + public void testTopicFencingTimeout() throws Exception { + ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + doReturn(svcConfig).when(pulsar).getConfiguration(); + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + + Method fence = PersistentTopic.class.getDeclaredMethod("fence"); + fence.setAccessible(true); + Method unfence = PersistentTopic.class.getDeclaredMethod("unfence"); + unfence.setAccessible(true); + + Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask"); + fencedTopicMonitoringTaskField.setAccessible(true); + Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced"); + isFencedField.setAccessible(true); + Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting"); + isClosingOrDeletingField.setAccessible(true); + + doReturn(10).when(svcConfig).getTopicFencingTimeoutSeconds(); + fence.invoke(topic); + unfence.invoke(topic); + ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic); + assertTrue(fencedTopicMonitoringTask.isDone()); + assertTrue(fencedTopicMonitoringTask.isCancelled()); + assertFalse((boolean) isFencedField.get(topic)); + assertFalse((boolean) isClosingOrDeletingField.get(topic)); + + doReturn(1).when(svcConfig).getTopicFencingTimeoutSeconds(); + fence.invoke(topic); + Thread.sleep(2000); + fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic); + assertTrue(fencedTopicMonitoringTask.isDone()); + assertFalse(fencedTopicMonitoringTask.isCancelled()); + assertTrue((boolean) isFencedField.get(topic)); + assertTrue((boolean) isClosingOrDeletingField.get(topic)); + } + private ByteBuf getMessageWithMetadata(byte[] data) throws IOException { MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis()) .setProducerName("prod-name").setSequenceId(0).build();