This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6917645574f5e8247df9d9f0cc242017319899ef Author: Lari Hotari <[email protected]> AuthorDate: Wed Dec 10 16:38:50 2025 +0200 [fix][broker] Fix various error-prone detected errors mainly in logging and String.format parameters (#25059) (cherry picked from commit 39ea19e41d6cfaa3049b7beca4480775d20d097e) --- .../main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 2 +- .../broker/loadbalance/extensions/scheduler/SplitScheduler.java | 2 +- .../strategy/DefaultNamespaceBundleSplitStrategyImpl.java | 2 +- .../pulsar/broker/service/PulsarMetadataEventSynchronizer.java | 8 +++++--- .../pulsar/broker/service/persistent/GeoPersistentReplicator.java | 2 +- .../apache/pulsar/broker/service/persistent/PersistentTopic.java | 5 ++--- .../pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java | 3 +-- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 51a665b0a0c..a8ed06c7fcf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1163,7 +1163,7 @@ public class Namespaces extends NamespacesBase { internalDeleteSubscriptionDispatchRateAsync() .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - log.error("Failed to delete the subscription dispatchRate for cluster on namespace {}", + log.error("[{}] Failed to delete the subscription dispatchRate for cluster on namespace {}", clientAppId(), namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java index 816fde0038a..70887ab2325 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java @@ -116,7 +116,7 @@ public class SplitScheduler implements LoadManagerScheduler { synchronized (bundleSplitStrategy) { final Set<SplitDecision> decisions = bundleSplitStrategy.findBundlesToSplit(context, pulsar); if (debugMode) { - log.info("Split Decisions:", decisions); + log.info("Split Decisions: {}", decisions); } if (!decisions.isEmpty()) { // currently following the unloading timeout diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java index 7875c07b122..6cc4293143e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java @@ -244,7 +244,7 @@ public class DefaultNamespaceBundleSplitStrategyImpl implements NamespaceBundleS .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); } catch (Throwable e) { counter.update(Failure, Unknown); - log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " Failed to get split boundaries.", bundle, e)); + log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " Failed to get split boundaries.", bundle), e); continue; } if (splitBoundary == null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java index a9564642c1a..a5fac333ae5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java @@ -228,11 +228,13 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize log.info("successfully created consumer {}", topicName); } else { State stateTransient = state; - log.info("[{}] Closing the new consumer because the synchronizer state is {}", stateTransient); + log.info("[{}] Closing the new consumer because the synchronizer state is {}", topicName, + stateTransient); CompletableFuture closeConsumer = new CompletableFuture<>(); closeResource(() -> consumer.closeAsync(), closeConsumer); closeConsumer.thenRun(() -> { - log.info("[{}] Closed the new consumer because the synchronizer state is {}", stateTransient); + log.info("[{}] Closed the new consumer because the synchronizer state is {}", topicName, + stateTransient); }); } }).exceptionally(ex -> { @@ -317,7 +319,7 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize } // Retry. long waitTimeMs = backOff.next(); - log.warn("[{}] Exception: '{}' occurred while trying to close the %s. Retrying again in {} s.", + log.warn("[{}] Exception: '{}' occurred while trying to close the {}. Retrying again in {} s.", topicName, ex.getMessage(), asyncCloseable.getClass().getSimpleName(), waitTimeMs / 1000.0, ex); brokerService.executor().schedule(() -> closeResource(asyncCloseable, future), waitTimeMs, TimeUnit.MILLISECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index 437067edb69..46f8a27d580 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -65,7 +65,7 @@ public class GeoPersistentReplicator extends PersistentReplicator { if (metadata.partitions == 0) { topicCheckFuture.complete(null); } else { - String errorMsg = String.format("{} Can not create the replicator due to the partitions in the" + String errorMsg = String.format("%s Can not create the replicator due to the partitions in the" + " remote cluster is not 0, but is %s", replicatorId, metadata.partitions); log.error(errorMsg); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9c9294cc5de..e7c075243ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2918,13 +2918,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal stats.topicCreationTimeStamp = getTopicCreationTimeStamp(); stats.compaction.reset(); - mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> { + mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).ifPresent(compactionRecord -> { stats.compaction.lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount(); stats.compaction.lastCompactionSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp(); stats.compaction.lastCompactionFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp(); stats.compaction.lastCompactionDurationTimeInMills = compactionRecord.getLastCompactionDurationTimeInMills(); - return compactionRecord; }); Map<String, CompletableFuture<SubscriptionStatsImpl>> subscriptionFutures = new HashMap<>(); @@ -2972,7 +2971,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } } if (getStatsOptions.isGetEarliestTimeInBacklog() && stats.backlogSize != 0) { - CompletableFuture finalRes = ledger.getEarliestMessagePublishTimeInBacklog() + CompletableFuture<TopicStatsImpl> finalRes = ledger.getEarliestMessagePublishTimeInBacklog() .thenApply((earliestTime) -> { stats.earliestMsgPublishTimeInBacklogs = earliestTime; return stats; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 1736cd3840d..1531672bef1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -326,7 +326,7 @@ public class NamespaceStatsAggregator { compactorMXBean .flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic.getName())) - .map(compactionRecord -> { + .ifPresent(compactionRecord -> { stats.compactionRemovedEventCount = compactionRecord.getCompactionRemovedEventCount(); stats.compactionSucceedCount = compactionRecord.getCompactionSucceedCount(); stats.compactionFailedCount = compactionRecord.getCompactionFailedCount(); @@ -346,7 +346,6 @@ public class NamespaceStatsAggregator { stats.compactionCompactedEntriesCount = entries; stats.compactionCompactedEntriesSize = size; } - return compactionRecord; }); }
