This is an automated email from the ASF dual-hosted git repository.

Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5747c09cc3d1cd76a8416dab94da0b85bd2497aa
Author: zhou zhuohan <[email protected]>
AuthorDate: Thu Apr 23 20:50:39 2026 +0800

    [fix][broker] Clean up orphan ledger on concurrent initial schema creation 
in BookkeeperSchemaStorage (#25514)
---
 .../mledger/impl/LedgerMetadataUtils.java          | 30 +++++-----
 .../service/schema/BookkeeperSchemaStorage.java    | 44 +++++++++++++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 65 ++++++++++++++++++++++
 .../bookkeeper/client/PulsarMockBookKeeper.java    |  2 +-
 .../bookkeeper/client/PulsarMockLedgerHandle.java  | 22 ++++++--
 5 files changed, 136 insertions(+), 27 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
index 4107949de51..9e1529d0683 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -29,28 +29,28 @@ import 
org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
  */
 public final class LedgerMetadataUtils {
 
-    private static final String METADATA_PROPERTY_APPLICATION = "application";
-    private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR = 
"pulsar".getBytes(StandardCharsets.UTF_8);
+    public static final String METADATA_PROPERTY_APPLICATION = "application";
+    public static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR = 
"pulsar".getBytes(StandardCharsets.UTF_8);
 
-    private static final String METADATA_PROPERTY_COMPONENT = "component";
-    private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER =
+    public static final String METADATA_PROPERTY_COMPONENT = "component";
+    public static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER =
             "managed-ledger".getBytes(StandardCharsets.UTF_8);
-    private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER =
+    public static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER =
             "compacted-ledger".getBytes(StandardCharsets.UTF_8);
-    private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = 
"schema".getBytes(StandardCharsets.UTF_8);
+    public static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = 
"schema".getBytes(StandardCharsets.UTF_8);
 
-    private static final byte[] 
METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET =
+    public static final byte[] 
METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET =
             "delayed-index-bucket".getBytes(StandardCharsets.UTF_8);
 
-    private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = 
"pulsar/managed-ledger";
-    private static final String METADATA_PROPERTY_CURSOR_NAME = 
"pulsar/cursor";
-    private static final String METADATA_PROPERTY_COMPACTEDTOPIC = 
"pulsar/compactedTopic";
-    private static final String METADATA_PROPERTY_COMPACTEDTO = 
"pulsar/compactedTo";
-    private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";
+    public static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = 
"pulsar/managed-ledger";
+    public static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor";
+    public static final String METADATA_PROPERTY_COMPACTEDTOPIC = 
"pulsar/compactedTopic";
+    public static final String METADATA_PROPERTY_COMPACTEDTO = 
"pulsar/compactedTo";
+    public static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";
 
-    private static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY = 
"pulsar/delayedIndexBucketKey";
-    private static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC = 
"pulsar/delayedIndexTopic";
-    private static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR = 
"pulsar/delayedIndexCursor";
+    public static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY = 
"pulsar/delayedIndexBucketKey";
+    public static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC = 
"pulsar/delayedIndexTopic";
+    public static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR = 
"pulsar/delayedIndexCursor";
 
     /**
      * Build base metadata for every ManagedLedger.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 74b41c1be84..3d5ee246c22 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -355,23 +355,51 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     }
 
     private CompletableFuture<Long> createNewSchema(String schemaId, byte[] 
data, byte[] hash) {
+        // Step 1: Store the schema data into a new BookKeeper ledger
         IndexEntry emptyIndex = new IndexEntry();
         emptyIndex.setVersion(0);
         emptyIndex.setHash(hash);
         emptyIndex.setPosition().setEntryId(-1L).setLedgerId(-1L);
+        CompletableFuture<PositionInfo> stored = addNewSchemaEntryToStore(
+                schemaId, Collections.singletonList(emptyIndex), data
+        );
 
-        return addNewSchemaEntryToStore(schemaId, 
Collections.singletonList(emptyIndex), data).thenCompose(position -> {
-            // The schema was stored in the ledger, now update the z-node with 
the pointer to it
+        return stored.thenCompose(position -> {
+            // Step 2: Create the schema locator z-node pointing to the ledger
             IndexEntry info = new IndexEntry();
             info.setVersion(0);
             info.setPosition().copyFrom(position);
             info.setHash(hash);
-
             SchemaLocator locator = new SchemaLocator();
             locator.setInfo().copyFrom(info);
             locator.addIndex().copyFrom(info);
-            return createSchemaLocator(getSchemaPath(schemaId), locator)
-                            .thenApply(ignore -> 0L);
+            CompletableFuture<Long> created = createSchemaLocator(
+                    getSchemaPath(schemaId), locator).thenApply(ignore -> 0L);
+
+            // Step 3: Handle failure by cleaning up the orphan ledger
+            // if concurrent schema creation caused a CAS conflict
+            return created.whenComplete((__, ex) -> {
+                if (ex == null) {
+                    return;
+                }
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                log.warn()
+                        .attr("schemaId", schemaId)
+                        .attr("ledgerId", position.getLedgerId())
+                        .exception(cause)
+                        .log("Failed to create schema locator with position");
+                if (cause instanceof AlreadyExistsException || cause 
instanceof BadVersionException) {
+                    bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc, 
ctx) -> {
+                        if (rc != BKException.Code.OK) {
+                            log.warn()
+                                    .attr("schemaId", schemaId)
+                                    .attr("ledgerId", position.getLedgerId())
+                                    .attr("rc", rc)
+                                    .log("Failed to delete orphan ledger after 
schema locator creation failed");
+                        }
+                    }, null);
+                }
+            });
         });
     }
 
@@ -484,7 +512,11 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
             if (ex != null) {
                 Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                log.warn("[{}] Failed to update schema locator with position 
{}", schemaId, position, cause);
+                log.warn()
+                        .attr("schemaId", schemaId)
+                        .attr("ledgerId", position.getLedgerId())
+                        .exception(cause)
+                        .log("Failed to update schema locator with position");
                 if (cause instanceof AlreadyExistsException || cause 
instanceof BadVersionException) {
                     bookKeeper.asyncDeleteLedger(position.getLedgerId(), new 
AsyncCallback.DeleteCallback() {
                         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 393ed0a84b1..23a61636072 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -55,6 +55,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema.Parser;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
@@ -1372,6 +1373,70 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         producers2.clear();
     }
 
+    /**
+     * Test that when multiple producers concurrently create schema for a 
brand-new topic,
+     * the orphan BookKeeper ledgers created by the losing requests are 
properly cleaned up.
+     */
+    @Test
+    public void testConcurrentCreateSchemaNoOrphanLedger() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+        final String topic = getTopicName(ns, 
"testConcurrentCreateSchemaNoOrphanLedger");
+        final String schemaName = TopicName.get(topic).getSchemaName();
+
+        org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk =
+                (org.apache.bookkeeper.client.PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
+
+        // Concurrently create producers with the same schema on a brand-new 
topic
+        int concurrency = 16;
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
+        List<CompletableFuture<Producer<Schemas.PersonOne>>> producers =
+                Collections.synchronizedList(new ArrayList<>(concurrency));
+        CountDownLatch latch = new CountDownLatch(concurrency);
+        for (int i = 0; i < concurrency; i++) {
+            executor.execute(() -> {
+                try {
+                    
producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                            .topic(topic).createAsync());
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        latch.await();
+        FutureUtil.waitForAll(producers).join();
+
+        // Verify only 1 schema version exists
+        assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+
+        // Count surviving BK ledgers whose customMetadata "pulsar/schemaId" 
matches this topic's schemaName.
+        // If orphan ledgers were not cleaned up, there would be more than 1.
+        int schemaLedgerCount = 0;
+        for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh : 
mockBk.getLedgerMap().values()) {
+            Map<String, byte[]> metadata = 
lh.getLedgerMetadata().getCustomMetadata();
+            byte[] schemaIdBytes = 
metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID);
+            if (schemaIdBytes != null
+                    && schemaName.equals(new String(schemaIdBytes, 
java.nio.charset.StandardCharsets.UTF_8))) {
+                schemaLedgerCount++;
+            }
+        }
+        assertEquals(schemaLedgerCount, 1,
+                "Expected exactly 1 schema ledger for the topic, but found "
+                        + schemaLedgerCount + ". Orphan ledgers were not 
cleaned up.");
+
+        // Cleanup
+        producers.forEach(p -> {
+            try {
+                p.join().close();
+            } catch (Exception ignore) {
+            }
+        });
+        producers.clear();
+    }
+
     @EqualsAndHashCode
     static class User implements Serializable {
         private String name;
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 58fddb07ff9..1cc7765fb79 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -137,7 +137,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
                     long id = sequence.getAndIncrement();
                     log.info("Creating ledger {}", id);
                     PulsarMockLedgerHandle lh =
-                            new 
PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd);
+                            new 
PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd, 
properties);
                     ledgers.put(id, lh);
                     return FutureUtils.value(lh);
                 } catch (Throwable t) {
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 74dab3ebbfe..a5403774133 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
@@ -69,7 +70,14 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
 
     public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
                            DigestType digest, byte[] passwd) throws 
GeneralSecurityException {
-        super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id, 
digest, passwd), new LongVersion(0L)),
+        this(bk, id, digest, passwd, Collections.emptyMap());
+    }
+
+    public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
+                           DigestType digest, byte[] passwd,
+                           Map<String, byte[]> customMetadata) throws 
GeneralSecurityException {
+        super(bk.getClientCtx(), id,
+              new Versioned<>(createMetadata(id, digest, passwd, 
customMetadata), new LongVersion(0L)),
               digest, passwd, WriteFlag.NONE);
         this.bk = bk;
         this.id = id;
@@ -274,14 +282,18 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
         return readHandle.readLastAddConfirmedAndEntryAsync(entryId, 
timeOutInMillis, parallel);
     }
 
-    private static LedgerMetadata createMetadata(long id, DigestType digest, 
byte[] passwd) {
+    private static LedgerMetadata createMetadata(long id, DigestType digest, 
byte[] passwd,
+                                                   Map<String, byte[]> 
customMetadata) {
         List<BookieId> ensemble = new 
ArrayList<>(PulsarMockBookKeeper.getMockEnsemble());
-        return LedgerMetadataBuilder.create()
+        LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
             .withDigestType(digest.toApiDigestType())
             .withPassword(passwd)
             .withId(id)
-            .newEnsembleEntry(0L, ensemble)
-            .build();
+            .newEnsembleEntry(0L, ensemble);
+        if (customMetadata != null && !customMetadata.isEmpty()) {
+            builder.withCustomMetadata(customMetadata);
+        }
+        return builder.build();
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarMockLedgerHandle.class);

Reply via email to