This is an automated email from the ASF dual-hosted git repository. Technoboy- pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5747c09cc3d1cd76a8416dab94da0b85bd2497aa Author: zhou zhuohan <[email protected]> AuthorDate: Thu Apr 23 20:50:39 2026 +0800 [fix][broker] Clean up orphan ledger on concurrent initial schema creation in BookkeeperSchemaStorage (#25514) --- .../mledger/impl/LedgerMetadataUtils.java | 30 +++++----- .../service/schema/BookkeeperSchemaStorage.java | 44 +++++++++++++-- .../java/org/apache/pulsar/schema/SchemaTest.java | 65 ++++++++++++++++++++++ .../bookkeeper/client/PulsarMockBookKeeper.java | 2 +- .../bookkeeper/client/PulsarMockLedgerHandle.java | 22 ++++++-- 5 files changed, 136 insertions(+), 27 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java index 4107949de51..9e1529d0683 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java @@ -29,28 +29,28 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; */ public final class LedgerMetadataUtils { - private static final String METADATA_PROPERTY_APPLICATION = "application"; - private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR = "pulsar".getBytes(StandardCharsets.UTF_8); + public static final String METADATA_PROPERTY_APPLICATION = "application"; + public static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR = "pulsar".getBytes(StandardCharsets.UTF_8); - private static final String METADATA_PROPERTY_COMPONENT = "component"; - private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER = + public static final String METADATA_PROPERTY_COMPONENT = "component"; + public static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER = "managed-ledger".getBytes(StandardCharsets.UTF_8); - private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER = + public static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER = "compacted-ledger".getBytes(StandardCharsets.UTF_8); - private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = "schema".getBytes(StandardCharsets.UTF_8); + public static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = "schema".getBytes(StandardCharsets.UTF_8); - private static final byte[] METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET = + public static final byte[] METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET = "delayed-index-bucket".getBytes(StandardCharsets.UTF_8); - private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = "pulsar/managed-ledger"; - private static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor"; - private static final String METADATA_PROPERTY_COMPACTEDTOPIC = "pulsar/compactedTopic"; - private static final String METADATA_PROPERTY_COMPACTEDTO = "pulsar/compactedTo"; - private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId"; + public static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = "pulsar/managed-ledger"; + public static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor"; + public static final String METADATA_PROPERTY_COMPACTEDTOPIC = "pulsar/compactedTopic"; + public static final String METADATA_PROPERTY_COMPACTEDTO = "pulsar/compactedTo"; + public static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId"; - private static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY = "pulsar/delayedIndexBucketKey"; - private static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC = "pulsar/delayedIndexTopic"; - private static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR = "pulsar/delayedIndexCursor"; + public static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY = "pulsar/delayedIndexBucketKey"; + public static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC = "pulsar/delayedIndexTopic"; + public static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR = "pulsar/delayedIndexCursor"; /** * Build base metadata for every ManagedLedger. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 74b41c1be84..3d5ee246c22 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -355,23 +355,51 @@ public class BookkeeperSchemaStorage implements SchemaStorage { } private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, byte[] hash) { + // Step 1: Store the schema data into a new BookKeeper ledger IndexEntry emptyIndex = new IndexEntry(); emptyIndex.setVersion(0); emptyIndex.setHash(hash); emptyIndex.setPosition().setEntryId(-1L).setLedgerId(-1L); + CompletableFuture<PositionInfo> stored = addNewSchemaEntryToStore( + schemaId, Collections.singletonList(emptyIndex), data + ); - return addNewSchemaEntryToStore(schemaId, Collections.singletonList(emptyIndex), data).thenCompose(position -> { - // The schema was stored in the ledger, now update the z-node with the pointer to it + return stored.thenCompose(position -> { + // Step 2: Create the schema locator z-node pointing to the ledger IndexEntry info = new IndexEntry(); info.setVersion(0); info.setPosition().copyFrom(position); info.setHash(hash); - SchemaLocator locator = new SchemaLocator(); locator.setInfo().copyFrom(info); locator.addIndex().copyFrom(info); - return createSchemaLocator(getSchemaPath(schemaId), locator) - .thenApply(ignore -> 0L); + CompletableFuture<Long> created = createSchemaLocator( + getSchemaPath(schemaId), locator).thenApply(ignore -> 0L); + + // Step 3: Handle failure by cleaning up the orphan ledger + // if concurrent schema creation caused a CAS conflict + return created.whenComplete((__, ex) -> { + if (ex == null) { + return; + } + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn() + .attr("schemaId", schemaId) + .attr("ledgerId", position.getLedgerId()) + .exception(cause) + .log("Failed to create schema locator with position"); + if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { + bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc, ctx) -> { + if (rc != BKException.Code.OK) { + log.warn() + .attr("schemaId", schemaId) + .attr("ledgerId", position.getLedgerId()) + .attr("rc", rc) + .log("Failed to delete orphan ledger after schema locator creation failed"); + } + }, null); + } + }); }); } @@ -484,7 +512,11 @@ public class BookkeeperSchemaStorage implements SchemaStorage { ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> { if (ex != null) { Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.warn("[{}] Failed to update schema locator with position {}", schemaId, position, cause); + log.warn() + .attr("schemaId", schemaId) + .attr("ledgerId", position.getLedgerId()) + .exception(cause) + .log("Failed to update schema locator with position"); if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { bookKeeper.asyncDeleteLedger(position.getLedgerId(), new AsyncCallback.DeleteCallback() { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 393ed0a84b1..23a61636072 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -55,6 +55,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; @@ -1372,6 +1373,70 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { producers2.clear(); } + /** + * Test that when multiple producers concurrently create schema for a brand-new topic, + * the orphan BookKeeper ledgers created by the losing requests are properly cleaned up. + */ + @Test + public void testConcurrentCreateSchemaNoOrphanLedger() throws Exception { + final String namespace = "test-namespace-" + randomName(16); + String ns = PUBLIC_TENANT + "/" + namespace; + admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME)); + + final String topic = getTopicName(ns, "testConcurrentCreateSchemaNoOrphanLedger"); + final String schemaName = TopicName.get(topic).getSchemaName(); + + org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk = + (org.apache.bookkeeper.client.PulsarMockBookKeeper) pulsar.getBookKeeperClient(); + + // Concurrently create producers with the same schema on a brand-new topic + int concurrency = 16; + @Cleanup("shutdownNow") + ExecutorService executor = Executors.newFixedThreadPool(concurrency); + List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = + Collections.synchronizedList(new ArrayList<>(concurrency)); + CountDownLatch latch = new CountDownLatch(concurrency); + for (int i = 0; i < concurrency; i++) { + executor.execute(() -> { + try { + producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic).createAsync()); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + FutureUtil.waitForAll(producers).join(); + + // Verify only 1 schema version exists + assertEquals(admin.schemas().getAllSchemas(topic).size(), 1); + + // Count surviving BK ledgers whose customMetadata "pulsar/schemaId" matches this topic's schemaName. + // If orphan ledgers were not cleaned up, there would be more than 1. + int schemaLedgerCount = 0; + for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) { + Map<String, byte[]> metadata = lh.getLedgerMetadata().getCustomMetadata(); + byte[] schemaIdBytes = metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID); + if (schemaIdBytes != null + && schemaName.equals(new String(schemaIdBytes, java.nio.charset.StandardCharsets.UTF_8))) { + schemaLedgerCount++; + } + } + assertEquals(schemaLedgerCount, 1, + "Expected exactly 1 schema ledger for the topic, but found " + + schemaLedgerCount + ". Orphan ledgers were not cleaned up."); + + // Cleanup + producers.forEach(p -> { + try { + p.join().close(); + } catch (Exception ignore) { + } + }); + producers.clear(); + } + @EqualsAndHashCode static class User implements Serializable { private String name; diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 58fddb07ff9..1cc7765fb79 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -137,7 +137,7 @@ public class PulsarMockBookKeeper extends BookKeeper { long id = sequence.getAndIncrement(); log.info("Creating ledger {}", id); PulsarMockLedgerHandle lh = - new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd); + new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd, properties); ledgers.put(id, lh); return FutureUtils.value(lh); } catch (Throwable t) { diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 74dab3ebbfe..a5403774133 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Enumeration; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -69,7 +70,14 @@ public class PulsarMockLedgerHandle extends LedgerHandle { public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd) throws GeneralSecurityException { - super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id, digest, passwd), new LongVersion(0L)), + this(bk, id, digest, passwd, Collections.emptyMap()); + } + + public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, + DigestType digest, byte[] passwd, + Map<String, byte[]> customMetadata) throws GeneralSecurityException { + super(bk.getClientCtx(), id, + new Versioned<>(createMetadata(id, digest, passwd, customMetadata), new LongVersion(0L)), digest, passwd, WriteFlag.NONE); this.bk = bk; this.id = id; @@ -274,14 +282,18 @@ public class PulsarMockLedgerHandle extends LedgerHandle { return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); } - private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd) { + private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd, + Map<String, byte[]> customMetadata) { List<BookieId> ensemble = new ArrayList<>(PulsarMockBookKeeper.getMockEnsemble()); - return LedgerMetadataBuilder.create() + LedgerMetadataBuilder builder = LedgerMetadataBuilder.create() .withDigestType(digest.toApiDigestType()) .withPassword(passwd) .withId(id) - .newEnsembleEntry(0L, ensemble) - .build(); + .newEnsembleEntry(0L, ensemble); + if (customMetadata != null && !customMetadata.isEmpty()) { + builder.withCustomMetadata(customMetadata); + } + return builder.build(); } private static final Logger log = LoggerFactory.getLogger(PulsarMockLedgerHandle.class);
