This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c3727d34405 [fix][client]Producer stuck or geo-replication stuck due 
to wrong value of message.numMessagesInBatch (#25106)
c3727d34405 is described below

commit c3727d34405e7e5492d5b5042c6ee4f512d2e453
Author: fengyubiao <[email protected]>
AuthorDate: Wed Dec 24 17:31:45 2025 +0800

    [fix][client]Producer stuck or geo-replication stuck due to wrong value of 
message.numMessagesInBatch (#25106)
    
    (cherry picked from commit ab65faa12ab7279a726411152af44d81b6a6704b)
---
 .../client/api/SimpleProducerConsumerTest.java     | 65 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  9 ++-
 2 files changed, 71 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 560f458f4ec..c553950014f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -112,10 +113,12 @@ import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.compression.CompressionCodec;
@@ -5406,4 +5409,66 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         // cleanup
         admin.topics().delete(topic, false);
     }
+
+    /**
+     * The internal producer of replicator will resend messages after 
reconnected. This test guarantees that the
+     * internal producer will continuously resent messages even though the 
client side encounters the following bugs.
+     * - The client side issue causes `message.metadata.numMessagesInBatch` 
being `0`, such as
+     *   https://github.com/streamnative/pulsar-rs/issues/376.
+     * - Before the fix, the resend mechanism relies on 
`message.metadata.numMessagesInBatch`, after the fix, the
+     *   producer only care about whether there are pending messages.
+     * see also https://github.com/apache/pulsar/pull/25106.
+     */
+    @Test
+    public void testResendMessagesWhichNumMessagesInBatchIsZero() throws 
Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+        final String subscriptionName = "s1";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createSubscription(topic, subscriptionName, 
MessageId.earliest);
+
+        // Create a producer which can be paused to publish.
+        AtomicBoolean stuckProducerReconnection = new AtomicBoolean(false);
+        ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder().serviceUrl(lookupUrl.toString());
+        PulsarClient client = 
InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
+            new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
+                protected void handleProducerSuccess(CommandProducerSuccess 
success) {
+                    if (stuckProducerReconnection.get()) {
+                        synchronized (stuckProducerReconnection) {
+                            super.handleProducerSuccess(success);
+                        }
+                    } else {
+                        super.handleProducerSuccess(success);
+                    }
+                }
+            });
+        ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) 
client.newProducer().topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false).create();
+
+        // Trigger a resending by unloading topics.
+        AtomicReference<CompletableFuture<MessageId>> latestPublishing = new 
AtomicReference<>();
+        synchronized (stuckProducerReconnection) {
+            stuckProducerReconnection.set(true);
+            admin.topics().unload(topic);
+            for (int i = 0; i < 10; i++) {
+                ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1);
+                MessageMetadata messageMetadata = new MessageMetadata();
+                messageMetadata.setUncompressedSize(1);
+                MessageImpl<byte[]> message1 = MessageImpl.create(topic, null, 
messageMetadata, payload,
+                        Optional.empty(), null, Schema.BYTES, 0, true, 0);
+                // Mock bugs, which publish messages with 0 numMessagesInBatch.
+                message1.getMessageBuilder().setNumMessagesInBatch(0);
+                latestPublishing.set(producer1.sendAsync(message1));
+            }
+            stuckProducerReconnection.set(false);
+        }
+
+        // Verify: no messages being stuck.
+        latestPublishing.get().get(10, TimeUnit.SECONDS);
+
+        // cleanup.
+        producer1.close();
+        client.close();
+        admin.topics().delete(topic, false);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a7fe1dc9c60..1f95aa6391a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -56,7 +56,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -1762,7 +1761,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
      */
     protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
         @VisibleForTesting
-        final Queue<OpSendMsg> delegate = new ArrayDeque<>();
+        final ArrayDeque<OpSendMsg> delegate = new ArrayDeque<>();
         private int forEachDepth = 0;
         private List<OpSendMsg> postponedOpSendMgs;
         private final AtomicInteger messagesCount = new AtomicInteger(0);
@@ -1817,6 +1816,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return messagesCount.get();
         }
 
+        public int size() {
+            return delegate.size();
+        }
+
         @Override
         public Iterator<OpSendMsg> iterator() {
             Iterator<OpSendMsg> delegateIterator = delegate.iterator();
@@ -2129,7 +2132,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 }
 
                 int messagesToResend = pendingMessages.messagesCount();
-                if (messagesToResend == 0) {
+                if (pendingMessages.size() == 0) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] [{}] No pending messages to resend 
{}", topic, producerName, messagesToResend);
                     }

Reply via email to