This is an automated email from the ASF dual-hosted git repository. coderzc pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6d7a22bfc57bfc1d5f61463320dbb2866b132271 Author: zhou zhuohan <[email protected]> AuthorDate: Thu Apr 23 20:50:39 2026 +0800 [fix][broker] Clean up orphan ledger on concurrent initial schema creation in BookkeeperSchemaStorage (#25514) --- .../mledger/impl/LedgerMetadataUtils.java | 30 +++++----- .../service/schema/BookkeeperSchemaStorage.java | 32 +++++++++-- .../java/org/apache/pulsar/schema/SchemaTest.java | 65 ++++++++++++++++++++++ .../bookkeeper/client/PulsarMockBookKeeper.java | 2 +- .../bookkeeper/client/PulsarMockLedgerHandle.java | 22 ++++++-- 5 files changed, 125 insertions(+), 26 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java index 4ac409a2e9b..13157ad4e38 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java @@ -29,28 +29,28 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; */ public final class LedgerMetadataUtils { - private static final String METADATA_PROPERTY_APPLICATION = "application"; - private static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR = "pulsar".getBytes(StandardCharsets.UTF_8); + public static final String METADATA_PROPERTY_APPLICATION = "application"; + public static final byte[] METADATA_PROPERTY_APPLICATION_PULSAR = "pulsar".getBytes(StandardCharsets.UTF_8); - private static final String METADATA_PROPERTY_COMPONENT = "component"; - private static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER = + public static final String METADATA_PROPERTY_COMPONENT = "component"; + public static final byte[] METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER = "managed-ledger".getBytes(StandardCharsets.UTF_8); - private static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER = + public static final byte[] METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER = "compacted-ledger".getBytes(StandardCharsets.UTF_8); - private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = "schema".getBytes(StandardCharsets.UTF_8); + public static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = "schema".getBytes(StandardCharsets.UTF_8); - private static final byte[] METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET = + public static final byte[] METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET = "delayed-index-bucket".getBytes(StandardCharsets.UTF_8); - private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = "pulsar/managed-ledger"; - private static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor"; - private static final String METADATA_PROPERTY_COMPACTEDTOPIC = "pulsar/compactedTopic"; - private static final String METADATA_PROPERTY_COMPACTEDTO = "pulsar/compactedTo"; - private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId"; + public static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = "pulsar/managed-ledger"; + public static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor"; + public static final String METADATA_PROPERTY_COMPACTEDTOPIC = "pulsar/compactedTopic"; + public static final String METADATA_PROPERTY_COMPACTEDTO = "pulsar/compactedTo"; + public static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId"; - private static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY = "pulsar/delayedIndexBucketKey"; - private static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC = "pulsar/delayedIndexTopic"; - private static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR = "pulsar/delayedIndexCursor"; + public static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY = "pulsar/delayedIndexBucketKey"; + public static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC = "pulsar/delayedIndexTopic"; + public static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR = "pulsar/delayedIndexCursor"; /** * Build base metadata for every ManagedLedger. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index e38bf48f1fd..ea6537e1e1e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -351,7 +351,10 @@ public class BookkeeperSchemaStorage implements SchemaStorage { .setLedgerId(-1L) ).build(); - return addNewSchemaEntryToStore(schemaId, Collections.singletonList(emptyIndex), data).thenCompose(position -> { + CompletableFuture<SchemaStorageFormat.PositionInfo> stored = + addNewSchemaEntryToStore(schemaId, Collections.singletonList(emptyIndex), data); + + return stored.thenCompose(position -> { // The schema was stored in the ledger, now update the z-node with the pointer to it SchemaStorageFormat.IndexEntry info = SchemaStorageFormat.IndexEntry.newBuilder() .setVersion(0) @@ -359,12 +362,30 @@ public class BookkeeperSchemaStorage implements SchemaStorage { .setHash(copyFrom(hash)) .build(); - return createSchemaLocator(getSchemaPath(schemaId), SchemaStorageFormat.SchemaLocator.newBuilder() + CompletableFuture<Long> created = createSchemaLocator(getSchemaPath(schemaId), + SchemaStorageFormat.SchemaLocator.newBuilder() .setInfo(info) .addAllIndex( newArrayList(info)) .build()) - .thenApply(ignore -> 0L); + .thenApply(ignore -> 0L); + + return created.whenComplete((__, ex) -> { + if (ex == null) { + return; + } + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn("[{}] Failed to create schema locator with ledger {}", schemaId, position.getLedgerId(), + cause); + if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { + bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc, ctx) -> { + if (rc != BKException.Code.OK) { + log.warn("[{}] Failed to delete orphan ledger {} after schema locator creation failed, " + + "rc: {}", schemaId, position.getLedgerId(), rc); + } + }, null); + } + }); }); } @@ -478,7 +499,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage { ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> { if (ex != null) { Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.warn("[{}] Failed to update schema locator with position {}", schemaId, position, cause); + log.warn("[{}] Failed to update schema locator with ledger {}", schemaId, position.getLedgerId(), + cause); if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) { bookKeeper.asyncDeleteLedger(position.getLedgerId(), new AsyncCallback.DeleteCallback() { @Override @@ -762,4 +784,4 @@ public class BookkeeperSchemaStorage implements SchemaStorage { throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t); }); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 9bf1f5e4048..286937cc42f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -57,6 +57,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; @@ -1471,6 +1472,70 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { producers2.clear(); } + /** + * Test that when multiple producers concurrently create schema for a brand-new topic, + * the orphan BookKeeper ledgers created by the losing requests are properly cleaned up. + */ + @Test + public void testConcurrentCreateSchemaNoOrphanLedger() throws Exception { + final String namespace = "test-namespace-" + randomName(16); + String ns = PUBLIC_TENANT + "/" + namespace; + admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME)); + + final String topic = getTopicName(ns, "testConcurrentCreateSchemaNoOrphanLedger"); + final String schemaName = TopicName.get(topic).getSchemaName(); + + org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk = + (org.apache.bookkeeper.client.PulsarMockBookKeeper) pulsar.getBookKeeperClient(); + + // Concurrently create producers with the same schema on a brand-new topic + int concurrency = 16; + @Cleanup("shutdownNow") + ExecutorService executor = Executors.newFixedThreadPool(concurrency); + List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = + Collections.synchronizedList(new ArrayList<>(concurrency)); + CountDownLatch latch = new CountDownLatch(concurrency); + for (int i = 0; i < concurrency; i++) { + executor.execute(() -> { + try { + producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic).createAsync()); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + FutureUtil.waitForAll(producers).join(); + + // Verify only 1 schema version exists + assertEquals(admin.schemas().getAllSchemas(topic).size(), 1); + + // Count surviving BK ledgers whose customMetadata "pulsar/schemaId" matches this topic's schemaName. + // If orphan ledgers were not cleaned up, there would be more than 1. + int schemaLedgerCount = 0; + for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) { + Map<String, byte[]> metadata = lh.getLedgerMetadata().getCustomMetadata(); + byte[] schemaIdBytes = metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID); + if (schemaIdBytes != null + && schemaName.equals(new String(schemaIdBytes, java.nio.charset.StandardCharsets.UTF_8))) { + schemaLedgerCount++; + } + } + assertEquals(schemaLedgerCount, 1, + "Expected exactly 1 schema ledger for the topic, but found " + + schemaLedgerCount + ". Orphan ledgers were not cleaned up."); + + // Cleanup + producers.forEach(p -> { + try { + p.join().close(); + } catch (Exception ignore) { + } + }); + producers.clear(); + } + @EqualsAndHashCode static class User implements Serializable { private String name; diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 54b8853fb0d..7104ded7460 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -133,7 +133,7 @@ public class PulsarMockBookKeeper extends BookKeeper { long id = sequence.getAndIncrement(); log.info("Creating ledger {}", id); PulsarMockLedgerHandle lh = - new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd); + new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd, properties); ledgers.put(id, lh); return FutureUtils.value(lh); } catch (Throwable t) { diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 2f5ded0fe01..4d1fd1380c8 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Enumeration; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -69,7 +70,14 @@ public class PulsarMockLedgerHandle extends LedgerHandle { public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd) throws GeneralSecurityException { - super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id, digest, passwd), new LongVersion(0L)), + this(bk, id, digest, passwd, Collections.emptyMap()); + } + + public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, + DigestType digest, byte[] passwd, + Map<String, byte[]> customMetadata) throws GeneralSecurityException { + super(bk.getClientCtx(), id, + new Versioned<>(createMetadata(id, digest, passwd, customMetadata), new LongVersion(0L)), digest, passwd, WriteFlag.NONE); this.bk = bk; this.id = id; @@ -268,14 +276,18 @@ public class PulsarMockLedgerHandle extends LedgerHandle { return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); } - private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd) { + private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd, + Map<String, byte[]> customMetadata) { List<BookieId> ensemble = new ArrayList<>(PulsarMockBookKeeper.getMockEnsemble()); - return LedgerMetadataBuilder.create() + LedgerMetadataBuilder builder = LedgerMetadataBuilder.create() .withDigestType(digest.toApiDigestType()) .withPassword(passwd) .withId(id) - .newEnsembleEntry(0L, ensemble) - .build(); + .newEnsembleEntry(0L, ensemble); + if (customMetadata != null && !customMetadata.isEmpty()) { + builder.withCustomMetadata(customMetadata); + } + return builder.build(); } private static final Logger log = LoggerFactory.getLogger(PulsarMockLedgerHandle.class);
