This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 08555f05b31 [fix][broker] Wait for orphan schema ledger cleanup before
retry (#25579)
08555f05b31 is described below
commit 08555f05b3168d9e75832c12f84973d34903a6c8
Author: sinan liu <[email protected]>
AuthorDate: Mon Apr 27 11:30:24 2026 +0800
[fix][broker] Wait for orphan schema ledger cleanup before retry (#25579)
---
.../service/schema/BookkeeperSchemaStorage.java | 85 +++++++++++----------
.../java/org/apache/pulsar/schema/SchemaTest.java | 88 +++++++++++++++++-----
2 files changed, 114 insertions(+), 59 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index ea6537e1e1e..cf9c3467ae6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
-import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
@@ -355,37 +354,32 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
addNewSchemaEntryToStore(schemaId,
Collections.singletonList(emptyIndex), data);
return stored.thenCompose(position -> {
- // The schema was stored in the ledger, now update the z-node with
the pointer to it
+ // Step 2: Create the schema locator z-node pointing to the ledger
SchemaStorageFormat.IndexEntry info =
SchemaStorageFormat.IndexEntry.newBuilder()
.setVersion(0)
.setPosition(position)
.setHash(copyFrom(hash))
.build();
- CompletableFuture<Long> created =
createSchemaLocator(getSchemaPath(schemaId),
- SchemaStorageFormat.SchemaLocator.newBuilder()
+ SchemaStorageFormat.SchemaLocator locator =
SchemaStorageFormat.SchemaLocator.newBuilder()
.setInfo(info)
- .addAllIndex(
- newArrayList(info))
- .build())
- .thenApply(ignore -> 0L);
-
- return created.whenComplete((__, ex) -> {
- if (ex == null) {
- return;
- }
- Throwable cause = FutureUtil.unwrapCompletionException(ex);
- log.warn("[{}] Failed to create schema locator with ledger
{}", schemaId, position.getLedgerId(),
- cause);
- if (cause instanceof AlreadyExistsException || cause
instanceof BadVersionException) {
- bookKeeper.asyncDeleteLedger(position.getLedgerId(), (rc,
ctx) -> {
- if (rc != BKException.Code.OK) {
- log.warn("[{}] Failed to delete orphan ledger {}
after schema locator creation failed, "
- + "rc: {}", schemaId,
position.getLedgerId(), rc);
+ .addAllIndex(newArrayList(info))
+ .build();
+ // Step 3: Handle failure by cleaning up the orphan ledger
+ // if concurrent schema creation caused a CAS conflict
+ return createSchemaLocator(getSchemaPath(schemaId), locator)
+ .thenApply(ignore -> 0L)
+ .exceptionallyCompose(ex -> {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ log.warn("[{}] Failed to create schema locator with
ledger {}", schemaId,
+ position.getLedgerId(), cause);
+ if (cause instanceof AlreadyExistsException || cause
instanceof BadVersionException) {
+ return deleteLedgerAsync(schemaId,
position.getLedgerId(),
+ "schema locator creation failed")
+ .thenCompose(__ ->
FutureUtil.failedFuture(cause));
}
- }, null);
- }
- });
+ return FutureUtil.failedFuture(cause);
+ });
});
}
@@ -496,26 +490,37 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
.addAllIndex(indexList)
.build()
, locatorEntry.version
- ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
- if (ex != null) {
- Throwable cause = FutureUtil.unwrapCompletionException(ex);
- log.warn("[{}] Failed to update schema locator with ledger
{}", schemaId, position.getLedgerId(),
- cause);
- if (cause instanceof AlreadyExistsException || cause
instanceof BadVersionException) {
- bookKeeper.asyncDeleteLedger(position.getLedgerId(), new
AsyncCallback.DeleteCallback() {
- @Override
- public void deleteComplete(int rc, Object ctx) {
- if (rc != BKException.Code.OK) {
- log.warn("[{}] Failed to delete ledger {}
after updating schema locator failed, rc: {}",
- schemaId, position.getLedgerId(), rc);
- }
- }
- }, null);
- }
+ ).thenApply(ignore -> nextVersion).exceptionallyCompose(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.warn("[{}] Failed to update schema locator with ledger {}",
schemaId, position.getLedgerId(),
+ cause);
+ if (cause instanceof AlreadyExistsException || cause instanceof
BadVersionException) {
+ return deleteLedgerAsync(schemaId, position.getLedgerId(),
+ "schema locator update failed")
+ .thenCompose(__ -> FutureUtil.failedFuture(cause));
}
+ return FutureUtil.failedFuture(cause);
});
}
+ private CompletableFuture<Void> deleteLedgerAsync(String schemaId, long
ledgerId, String reason) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
+ bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ log.warn("[{}] Failed to delete ledger {} after {}, rc:
{}",
+ schemaId, ledgerId, reason, rc);
+ }
+ future.complete(null);
+ }, null);
+ } catch (Throwable t) {
+ log.warn("[{}] Failed to delete ledger {} after {}",
+ schemaId, ledgerId, reason, t);
+ future.complete(null);
+ }
+ return future;
+ }
+
@NonNull
private CompletableFuture<SchemaStorageFormat.SchemaEntry>
findSchemaEntryByVersion(
List<SchemaStorageFormat.IndexEntry> index,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 286937cc42f..781709e672e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -57,6 +57,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -1485,48 +1487,96 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
final String topic = getTopicName(ns,
"testConcurrentCreateSchemaNoOrphanLedger");
final String schemaName = TopicName.get(topic).getSchemaName();
- org.apache.bookkeeper.client.PulsarMockBookKeeper mockBk =
- (org.apache.bookkeeper.client.PulsarMockBookKeeper)
pulsar.getBookKeeperClient();
+ PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper)
pulsar.getBookKeeperClient();
// Concurrently create producers with the same schema on a brand-new
topic
int concurrency = 16;
+ List<CompletableFuture<Producer<Schemas.PersonOne>>> producers =
createProducersInParallel(
+ topic, Schema.AVRO(Schemas.PersonOne.class), concurrency);
+ try {
+ FutureUtil.waitForAll(producers).join();
+
+ // Verify only 1 schema version exists
+ assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+
+ int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName);
+ assertEquals(schemaLedgerCount, 1,
+ "Expected exactly 1 schema ledger for the topic, but found
"
+ + schemaLedgerCount + ". Orphan ledgers were not
cleaned up.");
+ } finally {
+ closeProducers(producers);
+ }
+ }
+
+ /**
+ * Test that concurrent compatible schema updates clean up ledgers created
by requests
+ * that lose the schema locator CAS race.
+ */
+ @Test
+ public void testConcurrentUpdateSchemaNoOrphanLedger() throws Exception {
+ final String namespace = "test-namespace-" + randomName(16);
+ String ns = PUBLIC_TENANT + "/" + namespace;
+ admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+ final String topic = getTopicName(ns,
"testConcurrentUpdateSchemaNoOrphanLedger");
+ final String schemaName = TopicName.get(topic).getSchemaName();
+ PulsarMockBookKeeper mockBk = (PulsarMockBookKeeper)
pulsar.getBookKeeperClient();
+
+ @Cleanup
+ Producer<Schemas.PersonOne> initialProducer = pulsarClient
+ .newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic)
+ .create();
+ assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+
+ int concurrency = 16;
+ List<CompletableFuture<Producer<Schemas.PersonThree>>> producers =
createProducersInParallel(
+ topic, Schema.AVRO(Schemas.PersonThree.class), concurrency);
+ try {
+ FutureUtil.waitForAll(producers).join();
+
+ assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
+ int schemaLedgerCount = countSchemaLedgers(mockBk, schemaName);
+ assertEquals(schemaLedgerCount, 2,
+ "Expected exactly 2 schema ledgers for the topic, but
found "
+ + schemaLedgerCount + ". Orphan ledgers were not
cleaned up.");
+ } finally {
+ closeProducers(producers);
+ }
+ }
+
+ private <T> List<CompletableFuture<Producer<T>>> createProducersInParallel(
+ String topic, Schema<T> schema, int concurrency) throws
InterruptedException {
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
- List<CompletableFuture<Producer<Schemas.PersonOne>>> producers =
- Collections.synchronizedList(new ArrayList<>(concurrency));
+ List<CompletableFuture<Producer<T>>> producers =
Collections.synchronizedList(new ArrayList<>(concurrency));
CountDownLatch latch = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
executor.execute(() -> {
try {
-
producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
- .topic(topic).createAsync());
+
producers.add(pulsarClient.newProducer(schema).topic(topic).createAsync());
} finally {
latch.countDown();
}
});
}
latch.await();
- FutureUtil.waitForAll(producers).join();
-
- // Verify only 1 schema version exists
- assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+ return producers;
+ }
- // Count surviving BK ledgers whose customMetadata "pulsar/schemaId"
matches this topic's schemaName.
- // If orphan ledgers were not cleaned up, there would be more than 1.
+ private int countSchemaLedgers(PulsarMockBookKeeper mockBk, String
schemaName) {
int schemaLedgerCount = 0;
- for (org.apache.bookkeeper.client.PulsarMockLedgerHandle lh :
mockBk.getLedgerMap().values()) {
+ for (PulsarMockLedgerHandle lh : mockBk.getLedgerMap().values()) {
Map<String, byte[]> metadata =
lh.getLedgerMetadata().getCustomMetadata();
byte[] schemaIdBytes =
metadata.get(LedgerMetadataUtils.METADATA_PROPERTY_SCHEMAID);
- if (schemaIdBytes != null
- && schemaName.equals(new String(schemaIdBytes,
java.nio.charset.StandardCharsets.UTF_8))) {
+ if (schemaIdBytes != null && schemaName.equals(new
String(schemaIdBytes, StandardCharsets.UTF_8))) {
schemaLedgerCount++;
}
}
- assertEquals(schemaLedgerCount, 1,
- "Expected exactly 1 schema ledger for the topic, but found "
- + schemaLedgerCount + ". Orphan ledgers were not
cleaned up.");
+ return schemaLedgerCount;
+ }
- // Cleanup
+ private <T> void closeProducers(List<CompletableFuture<Producer<T>>>
producers) {
producers.forEach(p -> {
try {
p.join().close();