gaoran10 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1115143826
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -851,28 +985,51 @@ private void doCleanup(String broker) {
log.error("Failed to flush the in-flight messages.", e);
}
- if (serviceUnitTombstoneCnt > 0) {
- this.totalCleanupCnt++;
- this.totalServiceUnitCleanupTombstoneCnt +=
serviceUnitTombstoneCnt;
- this.totalBrokerCleanupTombstoneCnt++;
+ if (orphanServiceUnitCleanupCnt > 0) {
+ this.totalOrphanServiceUnitCleanupCnt +=
orphanServiceUnitCleanupCnt;
+ this.totalInactiveBrokerCleanupCnt++;
}
double cleanupTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
// TODO: clean load data stores
log.info("Completed a cleanup for the inactive broker:{} in {} ms. "
- + "Published tombstone for orphan service units:
serviceUnitTombstoneCnt:{}, "
+ + "Cleaned up orphan service units:
orphanServiceUnitCleanupCnt:{}, "
+ "approximate cleanupErrorCnt:{}, metrics:{} ",
broker,
cleanupTime,
- serviceUnitTombstoneCnt,
+ orphanServiceUnitCleanupCnt,
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
printCleanupMetrics());
cleanupJobs.remove(broker);
}
- // TODO: integrate this monitor logic when broker registry is added
- private void monitorOwnerships(List<String> brokers) {
+ private Optional<ServiceUnitStateData> getOverrideStateData(String
serviceUnit, ServiceUnitStateData orphanData,
+ Set<String>
availableBrokers,
+
LoadManagerContext context) {
+ if (isTransferCommand(orphanData)) {
+ // rollback to the src
+ return Optional.of(new ServiceUnitStateData(Owned,
orphanData.sourceBroker(), true));
+ } else if (orphanData.state() == Assigning) { // assign
+ // roll-forward to another broker
Review Comment:
Oh, sorry, thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]