This is an automated email from the ASF dual-hosted git repository.

Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ca3e8a40730057ad374a3dfc595037333c7aca1d
Author: sinan liu <[email protected]>
AuthorDate: Mon Apr 27 11:30:24 2026 +0800

    [fix][broker] Wait for orphan schema ledger cleanup before retry (#25579)
---
 .../service/schema/BookkeeperSchemaStorage.java    | 88 +++++++++++++---------
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 88 +++++++++++++++++-----
 2 files changed, 121 insertions(+), 55 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 9b71daff91f..d92bd8a2bc7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -39,7 +39,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
+<<<<<<< HEAD
 import org.apache.bookkeeper.client.AsyncCallback;
+=======
+import lombok.CustomLog;
+>>>>>>> f07decea04 ([fix][broker] Wait for orphan schema ledger cleanup before 
retry (#25579))
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -373,28 +377,21 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             SchemaLocator locator = new SchemaLocator();
             locator.setInfo().copyFrom(info);
             locator.addIndex().copyFrom(info);
-            CompletableFuture<Long> created = createSchemaLocator(
-                    getSchemaPath(schemaId), locator).thenApply(ignore -> 0L);
-
             // Step 3: Handle failure by cleaning up the orphan ledger
             // if concurrent schema creation caused a CAS conflict
-            return created.whenComplete((__, ex) -> {
-                if (ex == null) {
-                    return;
-                }
-                Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                log.warn("Failed to create schema locator with position, 
schemaId {}, ledgerId {}",
-                        schemaId, position.getLedgerId(), ex);
-                if (cause instanceof AlreadyExistsException || cause 
instanceof BadVersionException) {
-                    bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc, 
ctx) -> {
-                        if (rc != BKException.Code.OK) {
-                            log.warn("Failed to delete orphan ledger after 
schema locator creation"
-                                    + " failed, schemaId {}, ledgerId {}, rc: 
{}",
-                                    schemaId, position.getLedgerId(), rc);
+            return createSchemaLocator(getSchemaPath(schemaId), locator)
+                    .thenApply(ignore -> 0L)
+                    .exceptionallyCompose(ex -> {
+                        Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                        log.warn("Failed to create schema locator with 
position, schemaId {}, ledgerId {}",
+                                        schemaId, position.getLedgerId(), 
cause);
+                        if (cause instanceof AlreadyExistsException || cause 
instanceof BadVersionException) {
+                            return deleteLedgerAsync(schemaId, 
position.getLedgerId(),
+                                    "schema locator creation failed")
+                                    .thenCompose(__ -> 
FutureUtil.failedFuture(cause));
                         }
-                    }, null);
-                }
-            });
+                        return FutureUtil.failedFuture(cause);
+                    });
         });
     }
 
@@ -504,26 +501,45 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         return updateSchemaLocator(getSchemaPath(schemaId),
                 newLocator
                 , locatorEntry.version
-        ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
-            if (ex != null) {
-                Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                log.warn("Failed to update schema locator with position, 
schemaId {}, ledgerId {}",
-                        schemaId, position.getLedgerId(), ex);
-                if (cause instanceof AlreadyExistsException || cause 
instanceof BadVersionException) {
-                    bookKeeper.asyncDeleteLedger(position.getLedgerId(), new 
AsyncCallback.DeleteCallback() {
-                        @Override
-                        public void deleteComplete(int rc, Object ctx) {
-                            if (rc != BKException.Code.OK) {
-                                log.warn("[{}] Failed to delete ledger {} 
after updating schema locator failed, rc: {}",
-                                    schemaId, position.getLedgerId(), rc);
-                            }
-                        }
-                    }, null);
-                }
+        ).thenApply(ignore -> nextVersion).exceptionallyCompose(ex -> {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            log.warn("Failed to update schema locator with position, schemaId 
{}, ledgerId {}",
+                            schemaId, position.getLedgerId(), cause);
+            if (cause instanceof AlreadyExistsException || cause instanceof 
BadVersionException) {
+                return deleteLedgerAsync(schemaId, position.getLedgerId(),
+                        "schema locator update failed")
+                        .thenCompose(__ -> FutureUtil.failedFuture(cause));
             }
+            return FutureUtil.failedFuture(cause);
         });
     }
 
+    private CompletableFuture<Void> deleteLedgerAsync(String schemaId, long 
ledgerId, String reason) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+                if (rc != BKException.Code.OK) {
+                    log.warn()
+                            .attr("schemaId", schemaId)
+                            .attr("ledgerId", ledgerId)
+                            .attr("rc", rc)
+                            .attr("reason", reason)
+                            .log("Failed to delete orphan schema ledger");
+                }
+                future.complete(null);
+            }, null);
+        } catch (Throwable t) {
+            log.warn()
+                    .attr("schemaId", schemaId)
+                    .attr("ledgerId", ledgerId)
+                    .attr("reason", reason)
+                    .exception(t)
+                    .log("Failed to trigger orphan schema ledger deletion");
+            future.complete(null);
+        }
+        return future;
+    }
+
     @NonNull
     private CompletableFuture<SchemaEntry> findSchemaEntryByVersion(
         List<IndexEntry> index,
@@ -809,4 +825,4 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             throw t instanceof CompletionException ? (CompletionException) t : 
new CompletionException(t);
         });
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 23a61636072..fa0d3bdc9b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -55,6 +55,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema.Parser;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -1386,48 +1388,96 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         final String topic = getTopicName(ns, 
"testConcurrentCreateSchemaNoOrphanLedger");
         final String schemaName = TopicName.get(topic).getSchemaName();
 
-        org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk =
-                (org.apache.bookkeeper.client.PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
+        PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
 
         // Concurrently create producers with the same schema on a brand-new 
topic
         int concurrency = 16;
+        List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = 
createProducersInParallel(
+                topic, Schema.AVRO(Schemas.PersonOne.class), concurrency);
+        try {
+            FutureUtil.waitForAll(producers).join();
+
+            // Verify only 1 schema version exists
+            assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+
+            int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName);
+            assertEquals(schemaLedgerCount, 1,
+                    "Expected exactly 1 schema ledger for the topic, but found 
"
+                            + schemaLedgerCount + ". Orphan ledgers were not 
cleaned up.");
+        } finally {
+            closeProducers(producers);
+        }
+    }
+
+    /**
+     * Test that concurrent compatible schema updates clean up ledgers created 
by requests
+     * that lose the schema locator CAS race.
+     */
+    @Test
+    public void testConcurrentUpdateSchemaNoOrphanLedger() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+        final String topic = getTopicName(ns, 
"testConcurrentUpdateSchemaNoOrphanLedger");
+        final String schemaName = TopicName.get(topic).getSchemaName();
+        PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper) 
pulsar.getBookKeeperClient();
+
+        @Cleanup
+        Producer<Schemas.PersonOne> initialProducer = pulsarClient
+                .newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic)
+                .create();
+        assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+
+        int concurrency = 16;
+        List<CompletableFuture<Producer<Schemas.PersonThree>>> producers = 
createProducersInParallel(
+                topic, Schema.AVRO(Schemas.PersonThree.class), concurrency);
+        try {
+            FutureUtil.waitForAll(producers).join();
+
+            assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
+            int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName);
+            assertEquals(schemaLedgerCount, 2,
+                    "Expected exactly 2 schema ledgers for the topic, but 
found "
+                            + schemaLedgerCount + ". Orphan ledgers were not 
cleaned up.");
+        } finally {
+            closeProducers(producers);
+        }
+    }
+
+    private <T> List<CompletableFuture<Producer<T>>> createProducersInParallel(
+            String topic, Schema<T> schema, int concurrency) throws 
InterruptedException {
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newFixedThreadPool(concurrency);
-        List<CompletableFuture<Producer<Schemas.PersonOne>>> producers =
-                Collections.synchronizedList(new ArrayList<>(concurrency));
+        List<CompletableFuture<Producer<T>>> producers = 
Collections.synchronizedList(new ArrayList<>(concurrency));
         CountDownLatch latch = new CountDownLatch(concurrency);
         for (int i = 0; i < concurrency; i++) {
             executor.execute(() -> {
                 try {
-                    
producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
-                            .topic(topic).createAsync());
+                    
producers.add(pulsarClient.newProducer(schema).topic(topic).createAsync());
                 } finally {
                     latch.countDown();
                 }
             });
         }
         latch.await();
-        FutureUtil.waitForAll(producers).join();
-
-        // Verify only 1 schema version exists
-        assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+        return producers;
+    }
 
-        // Count surviving BK ledgers whose customMetadata "pulsar/schemaId" 
matches this topic's schemaName.
-        // If orphan ledgers were not cleaned up, there would be more than 1.
+    private int countSchemaLedgers(PulsarMockBookKeeper mockBk, String 
schemaName) {
         int schemaLedgerCount = 0;
-        for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh : 
mockBk.getLedgerMap().values()) {
+        for (PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) {
             Map<String, byte[]> metadata = 
lh.getLedgerMetadata().getCustomMetadata();
             byte[] schemaIdBytes = 
metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID);
-            if (schemaIdBytes != null
-                    && schemaName.equals(new String(schemaIdBytes, 
java.nio.charset.StandardCharsets.UTF_8))) {
+            if (schemaIdBytes != null && schemaName.equals(new 
String(schemaIdBytes, StandardCharsets.UTF_8))) {
                 schemaLedgerCount++;
             }
         }
-        assertEquals(schemaLedgerCount, 1,
-                "Expected exactly 1 schema ledger for the topic, but found "
-                        + schemaLedgerCount + ". Orphan ledgers were not 
cleaned up.");
+        return schemaLedgerCount;
+    }
 
-        // Cleanup
+    private <T> void closeProducers(List<CompletableFuture<Producer<T>>> 
producers) {
         producers.forEach(p -> {
             try {
                 p.join().close();

Reply via email to