This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 08555f05b31 [fix][broker] Wait for orphan schema ledger cleanup before 
retry (#25579)
08555f05b31 is described below

commit 08555f05b3168d9e75832c12f84973d34903a6c8
Author: sinan liu <[email protected]>
AuthorDate: Mon Apr 27 11:30:24 2026 +0800

    [fix][broker] Wait for orphan schema ledger cleanup before retry (#25579)
---
 .../service/schema/BookkeeperSchemaStorage.java    | 85 +++++++++++----------
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 88 +++++++++++++++++-----
 2 files changed, 114 insertions(+), 59 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index ea6537e1e1e..cf9c3467ae6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -355,37 +354,32 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                 addNewSchemaEntryToStore(schemaId, 
Collections.singletonList(emptyIndex), data);
 
         return stored.thenCompose(position -> {
-            // The schema was stored in the ledger, now update the z-node with 
the pointer to it
+            // Step 2: Create the schema locator z-node pointing to the ledger
             SchemaStorageFormat.IndexEntry info = 
SchemaStorageFormat.IndexEntry.newBuilder()
                     .setVersion(0)
                     .setPosition(position)
                     .setHash(copyFrom(hash))
                     .build();
 
-            CompletableFuture<Long> created = 
createSchemaLocator(getSchemaPath(schemaId),
-                    SchemaStorageFormat.SchemaLocator.newBuilder()
+            SchemaStorageFormat.SchemaLocator locator = 
SchemaStorageFormat.SchemaLocator.newBuilder()
                     .setInfo(info)
-                    .addAllIndex(
-                            newArrayList(info))
-                    .build())
-                    .thenApply(ignore -> 0L);
-
-            return created.whenComplete((__, ex) -> {
-                if (ex == null) {
-                    return;
-                }
-                Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                log.warn("[{}] Failed to create schema locator with ledger 
{}", schemaId, position.getLedgerId(),
-                        cause);
-                if (cause instanceof AlreadyExistsException || cause 
instanceof BadVersionException) {
-                    bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc, 
ctx) -> {
-                        if (rc != BKException.Code.OK) {
-                            log.warn("[{}] Failed to delete orphan ledger {} 
after schema locator creation failed, "
-                                    + "rc: {}", schemaId, 
position.getLedgerId(), rc);
+                    .addAllIndex(newArrayList(info))
+                    .build();
+            // Step 3: Handle failure by cleaning up the orphan ledger
+            // if concurrent schema creation caused a CAS conflict
+            return createSchemaLocator(getSchemaPath(schemaId), locator)
+                    .thenApply(ignore -> 0L)
+                    .exceptionallyCompose(ex -> {
+                        Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                        log.warn("[{}] Failed to create schema locator with 
ledger {}", schemaId,
+                                position.getLedgerId(), cause);
+                        if (cause instanceof AlreadyExistsException || cause 
instanceof BadVersionException) {
+                            return deleteLedgerAsync(schemaId, 
position.getLedgerId(),
+                                    "schema locator creation failed")
+                                    .thenCompose(__ -> 
FutureUtil.failedFuture(cause));
                         }
-                    }, null);
-                }
-            });
+                        return FutureUtil.failedFuture(cause);
+                    });
         });
     }
 
@@ -496,26 +490,37 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                         .addAllIndex(indexList)
                         .build()
                 , locatorEntry.version
-        ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
-            if (ex != null) {
-                Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                log.warn("[{}] Failed to update schema locator with ledger 
{}", schemaId, position.getLedgerId(),
-                        cause);
-                if (cause instanceof AlreadyExistsException || cause 
instanceof BadVersionException) {
-                    bookKeeper.asyncDeleteLedger(position.getLedgerId(), new 
AsyncCallback.DeleteCallback() {
-                        @Override
-                        public void deleteComplete(int rc, Object ctx) {
-                            if (rc != BKException.Code.OK) {
-                                log.warn("[{}] Failed to delete ledger {} 
after updating schema locator failed, rc: {}",
-                                    schemaId, position.getLedgerId(), rc);
-                            }
-                        }
-                    }, null);
-                }
+        ).thenApply(ignore -> nextVersion).exceptionallyCompose(ex -> {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            log.warn("[{}] Failed to update schema locator with ledger {}", 
schemaId, position.getLedgerId(),
+                    cause);
+            if (cause instanceof AlreadyExistsException || cause instanceof 
BadVersionException) {
+                return deleteLedgerAsync(schemaId, position.getLedgerId(),
+                        "schema locator update failed")
+                        .thenCompose(__ -> FutureUtil.failedFuture(cause));
             }
+            return FutureUtil.failedFuture(cause);
         });
     }
 
+    private CompletableFuture<Void> deleteLedgerAsync(String schemaId, long 
ledgerId, String reason) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+                if (rc != BKException.Code.OK) {
+                    log.warn("[{}] Failed to delete ledger {} after {}, rc: 
{}",
+                            schemaId, ledgerId, reason, rc);
+                }
+                future.complete(null);
+            }, null);
+        } catch (Throwable t) {
+            log.warn("[{}] Failed to delete ledger {} after {}",
+                    schemaId, ledgerId, reason, t);
+            future.complete(null);
+        }
+        return future;
+    }
+
     @NonNull
     private CompletableFuture<SchemaStorageFormat.SchemaEntry> 
findSchemaEntryByVersion(
         List<SchemaStorageFormat.IndexEntry> index,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 286937cc42f..781709e672e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -57,6 +57,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema.Parser;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -1485,48 +1487,96 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         final String topic = getTopicName(ns, 
"testConcurrentCreateSchemaNoOrphanLedger");
         final String schemaName = TopicName.get(topic).getSchemaName();
 
-        org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk =
-                (org.apache.bookkeeper.client.PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
+        PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
 
         // Concurrently create producers with the same schema on a brand-new 
topic
         int concurrency = 16;
+        List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = 
createProducersInParallel(
+                topic, Schema.AVRO(Schemas.PersonOne.class), concurrency);
+        try {
+            FutureUtil.waitForAll(producers).join();
+
+            // Verify only 1 schema version exists
+            assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+
+            int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName);
+            assertEquals(schemaLedgerCount, 1,
+                    "Expected exactly 1 schema ledger for the topic, but found 
"
+                            + schemaLedgerCount + ". Orphan ledgers were not 
cleaned up.");
+        } finally {
+            closeProducers(producers);
+        }
+    }
+
+    /**
+     * Test that concurrent compatible schema updates clean up ledgers created 
by requests
+     * that lose the schema locator CAS race.
+     */
+    @Test
+    public void testConcurrentUpdateSchemaNoOrphanLedger() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+        final String topic = getTopicName(ns, 
"testConcurrentUpdateSchemaNoOrphanLedger");
+        final String schemaName = TopicName.get(topic).getSchemaName();
+        PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
+
+        @Cleanup
+        Producer<Schemas.PersonOne> initialProducer = pulsarClient
+                .newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic)
+                .create();
+        assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+
+        int concurrency = 16;
+        List<CompletableFuture<Producer<Schemas.PersonThree>>> producers = 
createProducersInParallel(
+                topic, Schema.AVRO(Schemas.PersonThree.class), concurrency);
+        try {
+            FutureUtil.waitForAll(producers).join();
+
+            assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
+            int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName);
+            assertEquals(schemaLedgerCount, 2,
+                    "Expected exactly 2 schema ledgers for the topic, but 
found "
+                            + schemaLedgerCount + ". Orphan ledgers were not 
cleaned up.");
+        } finally {
+            closeProducers(producers);
+        }
+    }
+
+    private <T> List<CompletableFuture<Producer<T>>> createProducersInParallel(
+            String topic, Schema<T> schema, int concurrency) throws 
InterruptedException {
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newFixedThreadPool(concurrency);
-        List<CompletableFuture<Producer<Schemas.PersonOne>>> producers =
-                Collections.synchronizedList(new ArrayList<>(concurrency));
+        List<CompletableFuture<Producer<T>>> producers = 
Collections.synchronizedList(new ArrayList<>(concurrency));
         CountDownLatch latch = new CountDownLatch(concurrency);
         for (int i = 0; i < concurrency; i++) {
             executor.execute(() -> {
                 try {
-                    
producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                            .topic(topic).createAsync());
+                    
producers.add(pulsarClient.newProducer(schema).topic(topic).createAsync());
                 } finally {
                     latch.countDown();
                 }
             });
         }
         latch.await();
-        FutureUtil.waitForAll(producers).join();
-
-        // Verify only 1 schema version exists
-        assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+        return producers;
+    }
 
-        // Count surviving BK ledgers whose customMetadata "pulsar/schemaId" 
matches this topic's schemaName.
-        // If orphan ledgers were not cleaned up, there would be more than 1.
+    private int countSchemaLedgers(PulsarMockBookKeeper mockBk, String 
schemaName) {
         int schemaLedgerCount = 0;
-        for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh : 
mockBk.getLedgerMap().values()) {
+        for (PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) {
             Map<String, byte[]> metadata = 
lh.getLedgerMetadata().getCustomMetadata();
             byte[] schemaIdBytes = 
metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID);
-            if (schemaIdBytes != null
-                    && schemaName.equals(new String(schemaIdBytes, 
java.nio.charset.StandardCharsets.UTF_8))) {
+            if (schemaIdBytes != null && schemaName.equals(new 
String(schemaIdBytes, StandardCharsets.UTF_8))) {
                 schemaLedgerCount++;
             }
         }
-        assertEquals(schemaLedgerCount, 1,
-                "Expected exactly 1 schema ledger for the topic, but found "
-                        + schemaLedgerCount + ". Orphan ledgers were not 
cleaned up.");
+        return schemaLedgerCount;
+    }
 
-        // Cleanup
+    private <T> void closeProducers(List<CompletableFuture<Producer<T>>> 
producers) {
         producers.forEach(p -> {
             try {
                 p.join().close();

Reply via email to