This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 41e279f1f04 [fix][broker] Fix the broker close hanged issue. (#15755)
41e279f1f04 is described below
commit 41e279f1f0401af536133934138bc6e0beb69233
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed May 25 19:53:16 2022 -0700
[fix][broker] Fix the broker close hanged issue. (#15755)
---
.../main/java/org/apache/pulsar/broker/PulsarService.java | 15 +++++++--------
.../src/main/java/org/apache/zookeeper/MockZooKeeper.java | 6 ++++++
2 files changed, 13 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index e3349a9f964..6a29f886917 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -438,13 +438,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
* getConfiguration()
.getBrokerShutdownTimeoutMs())));
- // shutdown loadmanager before shutting down the broker
- executorServicesShutdown.shutdown(loadManagerExecutor);
- LoadManager loadManager = this.loadManager.get();
- if (loadManager != null) {
- loadManager.stop();
- }
-
List<CompletableFuture<Void>> asyncCloseFutures = new
ArrayList<>();
if (this.brokerService != null) {
CompletableFuture<Void> brokerCloseFuture =
this.brokerService.closeAsync();
@@ -479,6 +472,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.leaderElectionService = null;
}
+ // shutdown loadmanager before shutting down the broker
+ executorServicesShutdown.shutdown(loadManagerExecutor);
+
if (adminClient != null) {
adminClient.close();
adminClient = null;
@@ -505,7 +501,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
executorServicesShutdown.shutdown(orderedExecutor);
executorServicesShutdown.shutdown(cacheExecutor);
-
+ LoadManager loadManager = this.loadManager.get();
+ if (loadManager != null) {
+ loadManager.stop();
+ }
if (schemaRegistryService != null) {
schemaRegistryService.close();
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index cd0c60c0087..4e7501c9676 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -394,6 +394,9 @@ public class MockZooKeeper extends ZooKeeper {
KeeperState.SyncConnected,
parent)));
}
+ } catch (Exception ex) {
+ log.error("create path : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(),
path, ctx, null);
} finally {
unlockIfLocked();
}
@@ -962,6 +965,9 @@ public class MockZooKeeper extends ZooKeeper {
parent)));
triggerPersistentWatches(path, parent,
EventType.NodeDeleted);
}
+ } catch (Exception ex) {
+ log.error("delete path : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(),
path, ctx);
} finally {
unlockIfLocked();
}