This is an automated email from the ASF dual-hosted git repository.

coderzc pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6d7a22bfc57bfc1d5f61463320dbb2866b132271
Author: zhou zhuohan <[email protected]>
AuthorDate: Thu Apr 23 20:50:39 2026 +0800

    [fix][broker] Clean up orphan ledger on concurrent initial schema creation 
in BookkeeperSchemaStorage (#25514)
---
 .../mledger/impl/LedgerMetadataUtils.java          | 30 +++++-----
 .../service/schema/BookkeeperSchemaStorage.java    | 32 +++++++++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 65 ++++++++++++++++++++++
 .../bookkeeper/client/PulsarMockBookKeeper.java    |  2 +-
 .../bookkeeper/client/PulsarMockLedgerHandle.java  | 22 ++++++--
 5 files changed, 125 insertions(+), 26 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
index 4ac409a2e9b..13157ad4e38 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -29,28 +29,28 @@ import 
org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
  */
 public final class LedgerMetadataUtils {
 
-    private static final String METADATA_PROPERTY_APPLICATION = "application";
-    private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR = 
"pulsar".getBytes(StandardCharsets.UTF_8);
+    public static final String METADATA_PROPERTY_APPLICATION = "application";
+    public static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR = 
"pulsar".getBytes(StandardCharsets.UTF_8);
 
-    private static final String METADATA_PROPERTY_COMPONENT = "component";
-    private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER =
+    public static final String METADATA_PROPERTY_COMPONENT = "component";
+    public static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER =
             "managed-ledger".getBytes(StandardCharsets.UTF_8);
-    private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER =
+    public static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER =
             "compacted-ledger".getBytes(StandardCharsets.UTF_8);
-    private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = 
"schema".getBytes(StandardCharsets.UTF_8);
+    public static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = 
"schema".getBytes(StandardCharsets.UTF_8);
 
-    private static final byte[] 
METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET =
+    public static final byte[] 
METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET =
             "delayed-index-bucket".getBytes(StandardCharsets.UTF_8);
 
-    private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = 
"pulsar/managed-ledger";
-    private static final String METADATA_PROPERTY_CURSOR_NAME = 
"pulsar/cursor";
-    private static final String METADATA_PROPERTY_COMPACTEDTOPIC = 
"pulsar/compactedTopic";
-    private static final String METADATA_PROPERTY_COMPACTEDTO = 
"pulsar/compactedTo";
-    private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";
+    public static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = 
"pulsar/managed-ledger";
+    public static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor";
+    public static final String METADATA_PROPERTY_COMPACTEDTOPIC = 
"pulsar/compactedTopic";
+    public static final String METADATA_PROPERTY_COMPACTEDTO = 
"pulsar/compactedTo";
+    public static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";
 
-    private static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY = 
"pulsar/delayedIndexBucketKey";
-    private static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC = 
"pulsar/delayedIndexTopic";
-    private static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR = 
"pulsar/delayedIndexCursor";
+    public static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY = 
"pulsar/delayedIndexBucketKey";
+    public static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC = 
"pulsar/delayedIndexTopic";
+    public static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR = 
"pulsar/delayedIndexCursor";
 
     /**
      * Build base metadata for every ManagedLedger.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index e38bf48f1fd..ea6537e1e1e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -351,7 +351,10 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                                 .setLedgerId(-1L)
                         ).build();
 
-        return addNewSchemaEntryToStore(schemaId, 
Collections.singletonList(emptyIndex), data).thenCompose(position -> {
+        CompletableFuture<SchemaStorageFormat.PositionInfo> stored =
+                addNewSchemaEntryToStore(schemaId, 
Collections.singletonList(emptyIndex), data);
+
+        return stored.thenCompose(position -> {
             // The schema was stored in the ledger, now update the z-node with 
the pointer to it
             SchemaStorageFormat.IndexEntry info = 
SchemaStorageFormat.IndexEntry.newBuilder()
                     .setVersion(0)
@@ -359,12 +362,30 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                     .setHash(copyFrom(hash))
                     .build();
 
-            return createSchemaLocator(getSchemaPath(schemaId), 
SchemaStorageFormat.SchemaLocator.newBuilder()
+            CompletableFuture<Long> created = 
createSchemaLocator(getSchemaPath(schemaId),
+                    SchemaStorageFormat.SchemaLocator.newBuilder()
                     .setInfo(info)
                     .addAllIndex(
                             newArrayList(info))
                     .build())
-                            .thenApply(ignore -> 0L);
+                    .thenApply(ignore -> 0L);
+
+            return created.whenComplete((__, ex) -> {
+                if (ex == null) {
+                    return;
+                }
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                log.warn("[{}] Failed to create schema locator with ledger 
{}", schemaId, position.getLedgerId(),
+                        cause);
+                if (cause instanceof AlreadyExistsException || cause 
instanceof BadVersionException) {
+                    bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc, 
ctx) -> {
+                        if (rc != BKException.Code.OK) {
+                            log.warn("[{}] Failed to delete orphan ledger {} 
after schema locator creation failed, "
+                                    + "rc: {}", schemaId, 
position.getLedgerId(), rc);
+                        }
+                    }, null);
+                }
+            });
         });
     }
 
@@ -478,7 +499,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
             if (ex != null) {
                 Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                log.warn("[{}] Failed to update schema locator with position 
{}", schemaId, position, cause);
+                log.warn("[{}] Failed to update schema locator with ledger 
{}", schemaId, position.getLedgerId(),
+                        cause);
                 if (cause instanceof AlreadyExistsException || cause 
instanceof BadVersionException) {
                     bookKeeper.asyncDeleteLedger(position.getLedgerId(), new 
AsyncCallback.DeleteCallback() {
                         @Override
@@ -762,4 +784,4 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             throw t instanceof CompletionException ? (CompletionException) t : 
new CompletionException(t);
         });
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 9bf1f5e4048..286937cc42f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -57,6 +57,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema.Parser;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
@@ -1471,6 +1472,70 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         producers2.clear();
     }
 
+    /**
+     * Test that when multiple producers concurrently create schema for a 
brand-new topic,
+     * the orphan BookKeeper ledgers created by the losing requests are 
properly cleaned up.
+     */
+    @Test
+    public void testConcurrentCreateSchemaNoOrphanLedger() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+        final String topic = getTopicName(ns, 
"testConcurrentCreateSchemaNoOrphanLedger");
+        final String schemaName = TopicName.get(topic).getSchemaName();
+
+        org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk =
+                (org.apache.bookkeeper.client.PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
+
+        // Concurrently create producers with the same schema on a brand-new 
topic
+        int concurrency = 16;
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+        List<CompletableFuture<Producer<Schemas.PersonOne>>> producers =
+                Collections.synchronizedList(new ArrayList<>(concurrency));
+        CountDownLatch latch = new CountDownLatch(concurrency);
+        for (int i = 0; i < concurrency; i++) {
+            executor.execute(() -> {
+                try {
+                    
producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                            .topic(topic).createAsync());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        latch.await();
+        FutureUtil.waitForAll(producers).join();
+
+        // Verify only 1 schema version exists
+        assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+
+        // Count surviving BK ledgers whose customMetadata "pulsar/schemaId" 
matches this topic's schemaName.
+        // If orphan ledgers were not cleaned up, there would be more than 1.
+        int schemaLedgerCount = 0;
+        for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh : 
mockBk.getLedgerMap().values()) {
+            Map<String, byte[]> metadata = 
lh.getLedgerMetadata().getCustomMetadata();
+            byte[] schemaIdBytes = 
metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID);
+            if (schemaIdBytes != null
+                    && schemaName.equals(new String(schemaIdBytes, 
java.nio.charset.StandardCharsets.UTF_8))) {
+                schemaLedgerCount++;
+            }
+        }
+        assertEquals(schemaLedgerCount, 1,
+                "Expected exactly 1 schema ledger for the topic, but found "
+                        + schemaLedgerCount + ". Orphan ledgers were not 
cleaned up.");
+
+        // Cleanup
+        producers.forEach(p -> {
+            try {
+                p.join().close();
+            } catch (Exception ignore) {
+            }
+        });
+        producers.clear();
+    }
+
     @EqualsAndHashCode
     static class User implements Serializable {
         private String name;
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 54b8853fb0d..7104ded7460 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -133,7 +133,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
                     long id = sequence.getAndIncrement();
                     log.info("Creating ledger {}", id);
                     PulsarMockLedgerHandle lh =
-                            new 
PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd);
+                            new 
PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd, 
properties);
                     ledgers.put(id, lh);
                     return FutureUtils.value(lh);
                 } catch (Throwable t) {
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 2f5ded0fe01..4d1fd1380c8 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
@@ -69,7 +70,14 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
 
     public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
                            DigestType digest, byte[] passwd) throws 
GeneralSecurityException {
-        super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id, 
digest, passwd), new LongVersion(0L)),
+        this(bk, id, digest, passwd, Collections.emptyMap());
+    }
+
+    public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
+                           DigestType digest, byte[] passwd,
+                           Map<String, byte[]> customMetadata) throws 
GeneralSecurityException {
+        super(bk.getClientCtx(), id,
+              new Versioned<>(createMetadata(id, digest, passwd, 
customMetadata), new LongVersion(0L)),
               digest, passwd, WriteFlag.NONE);
         this.bk = bk;
         this.id = id;
@@ -268,14 +276,18 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
         return readHandle.readLastAddConfirmedAndEntryAsync(entryId, 
timeOutInMillis, parallel);
     }
 
-    private static LedgerMetadata createMetadata(long id, DigestType digest, 
byte[] passwd) {
+    private static LedgerMetadata createMetadata(long id, DigestType digest, 
byte[] passwd,
+                                                   Map<String, byte[]> 
customMetadata) {
         List<BookieId> ensemble = new 
ArrayList<>(PulsarMockBookKeeper.getMockEnsemble());
-        return LedgerMetadataBuilder.create()
+        LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
             .withDigestType(digest.toApiDigestType())
             .withPassword(passwd)
             .withId(id)
-            .newEnsembleEntry(0L, ensemble)
-            .build();
+            .newEnsembleEntry(0L, ensemble);
+        if (customMetadata != null && !customMetadata.isEmpty()) {
+            builder.withCustomMetadata(customMetadata);
+        }
+        return builder.build();
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarMockLedgerHandle.class);

Reply via email to