This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6856a1523c43da16b30be978d169b3c3215140b7 Author: fengyubiao <[email protected]> AuthorDate: Mon May 12 19:15:25 2025 +0800 [fix][client] Fix producer publishing getting stuck after message with incompatible schema is discarded (#24282) Fixes #24262 Main Issue: #24262 ### Motivation The issue is a regression of https://github.com/apache/pulsar/pull/24178. The flow of the issue occurring is as follows - Publish msg 1, which has an incompatible schema - Publish msg 2, which has an incompatible schema - The first message's schema failed to register because it is incompatible - The message was discarded. - Issue: it did not trigger a schema registration of the following messages ### Modifications Fix the issue (cherry picked from commit df2c619b14d277d8df82ce9a0ab1ab146cf51b7f) --- .../java/org/apache/pulsar/schema/SchemaTest.java | 20 +++++++++++++++----- .../org/apache/pulsar/client/impl/ProducerImpl.java | 15 +++++++++++---- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 6ba1e029673..84d3c52a6be 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -1526,8 +1526,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { producer.close(); } - // This test fails consistently, disabling until it is fixed. Issue https://github.com/apache/pulsar/issues/24262 - @Test(enabled = false) + @Test public void testPendingQueueSizeIfIncompatible() throws Exception { final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns"); admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME)); @@ -1536,17 +1535,28 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { admin.topics().createNonPartitionedTopic(topic); ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) - .maxPendingMessages(50).enableBatching(false).topic(topic).create(); - producer.newMessage(Schema.STRING).value("msg").sendAsync(); + .maxPendingMessages(1000).enableBatching(false).topic(topic).create(); + producer.newMessage(Schema.STRING).value("msg-1").sendAsync(); AtomicReference<CompletableFuture<MessageId>> latestSend = new AtomicReference<>(); for (int i = 0; i < 100; i++) { - latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync()); + final String msg = "msg-with-broken-schema-" + i; + latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync().thenApply(v -> { + log.info("send complete {}", msg); + return null; + }).exceptionally(ex -> { + log.error("failed to send {}", msg, ex); + return null; + })); } + // Verify: msgs with broken schema will be discarded. Awaitility.await().untilAsserted(() -> { assertTrue(latestSend.get().isDone()); assertEquals(producer.getPendingQueueSize(), 0); }); + // Verify: msgs with compatible schema can be sent successfully. + producer.newMessage(Schema.STRING).value("msg-2").sendAsync(); + // cleanup. producer.close(); admin.topics().delete(topic, false); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index f27208844b7..89bee6cccc6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -2447,6 +2447,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne * 3-1-1. If {@link #pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true pause all following * publishing to avoid out-of-order issue. * 3-1-2. Otherwise, discard the failed message anc continuously publishing the following messages. + * Additionally, the following messages may need schema registration also. * 3-2. The new schema registration failed due to other error, retry registering. * Note: Since the current method accesses & modifies {@link #pendingMessages}, you should acquire a lock on * {@link ProducerImpl} before calling method. @@ -2465,6 +2466,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne Iterator<OpSendMsg> msgIterator = pendingMessages.iterator(); MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema; OpSendMsg loopEndDueToSchemaRegisterNeeded = null; + boolean pausedSendingToPreservePublishOrderOnSchemaRegFailure = false; while (msgIterator.hasNext()) { OpSendMsg op = msgIterator.next(); if (loopStartAt != null) { @@ -2509,6 +2511,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne + " 2) Unload topic on target cluster. Schema details: {}", topic, producerName, SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false)); loopEndDueToSchemaRegisterNeeded = op; + pausedSendingToPreservePublishOrderOnSchemaRegFailure = true; break; } // Event 3-1-2. @@ -2564,7 +2567,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } cnx.ctx().flush(); - // "Event 1-1" or "Event 3-1-1" or "Event 3-2". + // "Event 1-1" or "Event 3-1-1" or "Event 3-1-2" or "Event 3-2". if (loopEndDueToSchemaRegisterNeeded != null) { if (compareAndSetState(State.Connecting, State.Ready)) { // "Event 1-1" happens after "Event 3-1-1". @@ -2572,15 +2575,19 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne // after users changed the compatibility strategy to make the schema is compatible. tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback, expectedEpoch); - } else if (!failedIncompatibleSchema && compareAndSetState(State.RegisteringSchema, State.Ready)) { - // "Event 2-1" or "Event 3-2". + } else if (pausedSendingToPreservePublishOrderOnSchemaRegFailure) { + // Nothing to do if the event is "Event 3-1-1", just keep stuck. + return; + } else if (compareAndSetState(State.RegisteringSchema, State.Ready)) { + // "Event 2-1" or "Event 3-1-2" or "Event 3-2". // "pendingMessages" has more messages to register new schema. // This operation will not be conflict with another schema registration because both operations are // attempt to acquire the same lock "ProducerImpl.this". tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback, expectedEpoch); } - // Nothing to do if the event is "Event 3-1-1", just keep stuck. + // Schema registration will trigger a new "recoverProcessOpSendMsgFrom", so return here. If failed to switch + // state, it means another task will trigger a new "recoverProcessOpSendMsgFrom". return; } else if (latestMsgAttemptedRegisteredSchema != null) { // Event 2-2 or "Event 3-1-2".
