This is an automated email from the ASF dual-hosted git repository. Technoboy- pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ca3e8a40730057ad374a3dfc595037333c7aca1d Author: sinan liu <[email protected]> AuthorDate: Mon Apr 27 11:30:24 2026 +0800 [fix][broker] Wait for orphan schema ledger cleanup before retry (#25579) --- .../service/schema/BookkeeperSchemaStorage.java | 88 +++++++++++++--------- .../java/org/apache/pulsar/schema/SchemaTest.java | 88 +++++++++++++++++----- 2 files changed, 121 insertions(+), 55 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 9b71daff91f..d92bd8a2bc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -39,7 +39,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.function.Function; +<<<<<<< HEAD import org.apache.bookkeeper.client.AsyncCallback; +======= +import lombok.CustomLog; +>>>>>>> f07decea04 ([fix][broker] Wait for orphan schema ledger cleanup before retry (#25579)) import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; @@ -373,28 +377,21 @@ public class BookkeeperSchemaStorage implements SchemaStorage { SchemaLocator locator = new SchemaLocator(); locator.setInfo().copyFrom(info); locator.addIndex().copyFrom(info); - CompletableFuture<Long> created = createSchemaLocator( - getSchemaPath(schemaId), locator).thenApply(ignore -> 0L); - // Step 3: Handle failure by cleaning up the orphan ledger // if concurrent schema creation caused a CAS conflict - return created.whenComplete((__, ex) -> { - if (ex == null) { - return; - } - Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.warn("Failed to create schema locator with position, schemaId {}, ledgerId {}", - schemaId, position.getLedgerId(), ex); - if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { - bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc, ctx) -> { - if (rc != BKException.Code.OK) { - log.warn("Failed to delete orphan ledger after schema locator creation" - + " failed, schemaId {}, ledgerId {}, rc: {}", - schemaId, position.getLedgerId(), rc); + return createSchemaLocator(getSchemaPath(schemaId), locator) + .thenApply(ignore -> 0L) + .exceptionallyCompose(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn("Failed to create schema locator with position, schemaId {}, ledgerId {}", + schemaId, position.getLedgerId(), cause); + if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { + return deleteLedgerAsync(schemaId, position.getLedgerId(), + "schema locator creation failed") + .thenCompose(__ -> FutureUtil.failedFuture(cause)); } - }, null); - } - }); + return FutureUtil.failedFuture(cause); + }); }); } @@ -504,26 +501,45 @@ public class BookkeeperSchemaStorage implements SchemaStorage { return updateSchemaLocator(getSchemaPath(schemaId), newLocator , locatorEntry.version - ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> { - if (ex != null) { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.warn("Failed to update schema locator with position, schemaId {}, ledgerId {}", - schemaId, position.getLedgerId(), ex); - if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { - bookKeeper.asyncDeleteLedger(position.getLedgerId(), new AsyncCallback.DeleteCallback() { - @Override - public void deleteComplete(int rc, Object ctx) { - if (rc != BKException.Code.OK) { - log.warn("[{}] Failed to delete ledger {} after updating schema locator failed, rc: {}", - schemaId, position.getLedgerId(), rc); - } - } - }, null); - } + ).thenApply(ignore -> nextVersion).exceptionallyCompose(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn("Failed to update schema locator with position, schemaId {}, ledgerId {}", + schemaId, position.getLedgerId(), cause); + if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { + return deleteLedgerAsync(schemaId, position.getLedgerId(), + "schema locator update failed") + .thenCompose(__ -> FutureUtil.failedFuture(cause)); } + return FutureUtil.failedFuture(cause); }); } + private CompletableFuture<Void> deleteLedgerAsync(String schemaId, long ledgerId, String reason) { + CompletableFuture<Void> future = new CompletableFuture<>(); + try { + bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> { + if (rc != BKException.Code.OK) { + log.warn() + .attr("schemaId", schemaId) + .attr("ledgerId", ledgerId) + .attr("rc", rc) + .attr("reason", reason) + .log("Failed to delete orphan schema ledger"); + } + future.complete(null); + }, null); + } catch (Throwable t) { + log.warn() + .attr("schemaId", schemaId) + .attr("ledgerId", ledgerId) + .attr("reason", reason) + .exception(t) + .log("Failed to trigger orphan schema ledger deletion"); + future.complete(null); + } + return future; + } + @NonNull private CompletableFuture<SchemaEntry> findSchemaEntryByVersion( List<IndexEntry> index, @@ -809,4 +825,4 @@ public class BookkeeperSchemaStorage implements SchemaStorage { throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t); }); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 23a61636072..fa0d3bdc9b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -55,6 +55,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; +import org.apache.bookkeeper.client.PulsarMockLedgerHandle; import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -1386,48 +1388,96 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { final String topic = getTopicName(ns, "testConcurrentCreateSchemaNoOrphanLedger"); final String schemaName = TopicName.get(topic).getSchemaName(); - org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk = - (org.apache.bookkeeper.client.PulsarMockBookKeeper) pulsar.getBookKeeperClient(); + PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper) pulsar.getBookKeeperClient(); // Concurrently create producers with the same schema on a brand-new topic int concurrency = 16; + List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = createProducersInParallel( + topic, Schema.AVRO(Schemas.PersonOne.class), concurrency); + try { + FutureUtil.waitForAll(producers).join(); + + // Verify only 1 schema version exists + assertEquals(admin.schemas().getAllSchemas(topic).size(), 1); + + int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName); + assertEquals(schemaLedgerCount, 1, + "Expected exactly 1 schema ledger for the topic, but found " + + schemaLedgerCount + ". Orphan ledgers were not cleaned up."); + } finally { + closeProducers(producers); + } + } + + /** + * Test that concurrent compatible schema updates clean up ledgers created by requests + * that lose the schema locator CAS race. + */ + @Test + public void testConcurrentUpdateSchemaNoOrphanLedger() throws Exception { + final String namespace = "test-namespace-" + randomName(16); + String ns = PUBLIC_TENANT + "/" + namespace; + admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME)); + + final String topic = getTopicName(ns, "testConcurrentUpdateSchemaNoOrphanLedger"); + final String schemaName = TopicName.get(topic).getSchemaName(); + PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper) pulsar.getBookKeeperClient(); + + @Cleanup + Producer<Schemas.PersonOne> initialProducer = pulsarClient + .newProducer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic) + .create(); + assertEquals(admin.schemas().getAllSchemas(topic).size(), 1); + + int concurrency = 16; + List<CompletableFuture<Producer<Schemas.PersonThree>>> producers = createProducersInParallel( + topic, Schema.AVRO(Schemas.PersonThree.class), concurrency); + try { + FutureUtil.waitForAll(producers).join(); + + assertEquals(admin.schemas().getAllSchemas(topic).size(), 2); + int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName); + assertEquals(schemaLedgerCount, 2, + "Expected exactly 2 schema ledgers for the topic, but found " + + schemaLedgerCount + ". Orphan ledgers were not cleaned up."); + } finally { + closeProducers(producers); + } + } + + private <T> List<CompletableFuture<Producer<T>>> createProducersInParallel( + String topic, Schema<T> schema, int concurrency) throws InterruptedException { @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(concurrency); - List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = - Collections.synchronizedList(new ArrayList<>(concurrency)); + List<CompletableFuture<Producer<T>>> producers = Collections.synchronizedList(new ArrayList<>(concurrency)); CountDownLatch latch = new CountDownLatch(concurrency); for (int i = 0; i < concurrency; i++) { executor.execute(() -> { try { - producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)) - .topic(topic).createAsync()); + producers.add(pulsarClient.newProducer(schema).topic(topic).createAsync()); } finally { latch.countDown(); } }); } latch.await(); - FutureUtil.waitForAll(producers).join(); - - // Verify only 1 schema version exists - assertEquals(admin.schemas().getAllSchemas(topic).size(), 1); + return producers; + } - // Count surviving BK ledgers whose customMetadata "pulsar/schemaId" matches this topic's schemaName. - // If orphan ledgers were not cleaned up, there would be more than 1. + private int countSchemaLedgers(PulsarMockBookKeeper mockBk, String schemaName) { int schemaLedgerCount = 0; - for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) { + for (PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) { Map<String, byte[]> metadata = lh.getLedgerMetadata().getCustomMetadata(); byte[] schemaIdBytes = metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID); - if (schemaIdBytes != null - && schemaName.equals(new String(schemaIdBytes, java.nio.charset.StandardCharsets.UTF_8))) { + if (schemaIdBytes != null && schemaName.equals(new String(schemaIdBytes, StandardCharsets.UTF_8))) { schemaLedgerCount++; } } - assertEquals(schemaLedgerCount, 1, - "Expected exactly 1 schema ledger for the topic, but found " - + schemaLedgerCount + ". Orphan ledgers were not cleaned up."); + return schemaLedgerCount; + } - // Cleanup + private <T> void closeProducers(List<CompletableFuture<Producer<T>>> producers) { producers.forEach(p -> { try { p.join().close();
