This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 34e38f0e8b1 [fix][client] Fix exclusive V5 scalable topic producer
initial segment claim race condition (#25778)
34e38f0e8b1 is described below
commit 34e38f0e8b16e4d7c74c33db139e341bf99167eb
Author: Philipp Dolif <[email protected]>
AuthorDate: Fri May 15 00:22:17 2026 +0200
[fix][client] Fix exclusive V5 scalable topic producer initial segment
claim race condition (#25778)
---
.../java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 0fe745ff2fa..8d88490794e 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -389,7 +389,7 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
// that segment will surface the error via the normal
PulsarClientException
// path. (The initial-create path uses {@link
#eagerAttachInitialAsync} for
// strict claim.)
- if (requiresExclusiveAttach()) {
+ if (requiresExclusiveAttach() && oldLayout != null) {
CompletableFuture.runAsync(() -> {
for (var seg : newLayout.activeSegments()) {
if (segmentProducers.containsKey(seg.segmentId())) {
@@ -449,9 +449,6 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
}
return CompletableFuture.runAsync(() -> {
for (var seg : activeSegments) {
- if (segmentProducers.containsKey(seg.segmentId())) {
- continue;
- }
try {
getOrCreateSegmentProducer(seg.segmentId());
} catch (PulsarClientException e) {