This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new dec0a9dbd1f [fix][client]Producer stuck or geo-replication stuck due 
to wrong value of message.numMessagesInBatch (#25106)
dec0a9dbd1f is described below

commit dec0a9dbd1fa3a500180a916d7dab9d1fdb6cd42
Author: fengyubiao <[email protected]>
AuthorDate: Wed Dec 24 17:31:45 2025 +0800

    [fix][client]Producer stuck or geo-replication stuck due to wrong value of 
message.numMessagesInBatch (#25106)
    
    (cherry picked from commit ab65faa12ab7279a726411152af44d81b6a6704b)
---
 .../client/api/SimpleProducerConsumerTest.java     | 66 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  9 ++-
 2 files changed, 72 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index b76328f252f..d5b10fb8347 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -100,6 +100,8 @@ import 
org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -111,10 +113,12 @@ import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.compression.CompressionCodec;
@@ -5433,4 +5437,66 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         // cleanup
         admin.topics().delete(topic, false);
     }
+
+    /**
+     * The internal producer of replicator will resend messages after 
reconnected. This test guarantees that the
+     * internal producer will continuously resent messages even though the 
client side encounters the following bugs.
+     * - The client side issue causes `message.metadata.numMessagesInBatch` 
being `0`, such as
+     *   https://github.com/streamnative/pulsar-rs/issues/376.
+     * - Before the fix, the resend mechanism relies on 
`message.metadata.numMessagesInBatch`, after the fix, the
+     *   producer only care about whether there are pending messages.
+     * see also https://github.com/apache/pulsar/pull/25106.
+     */
+    @Test
+    public void testResendMessagesWhichNumMessagesInBatchIsZero() throws 
Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+        final String subscriptionName = "s1";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createSubscription(topic, subscriptionName, 
MessageId.earliest);
+
+        // Create a producer which can be paused to publish.
+        AtomicBoolean stuckProducerReconnection = new AtomicBoolean(false);
+        ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder().serviceUrl(lookupUrl.toString());
+        PulsarClient client = 
InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
+            new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
+                protected void handleProducerSuccess(CommandProducerSuccess 
success) {
+                    if (stuckProducerReconnection.get()) {
+                        synchronized (stuckProducerReconnection) {
+                            super.handleProducerSuccess(success);
+                        }
+                    } else {
+                        super.handleProducerSuccess(success);
+                    }
+                }
+            });
+        ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) 
client.newProducer().topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false).create();
+
+        // Trigger a resending by unloading topics.
+        AtomicReference<CompletableFuture<MessageId>> latestPublishing = new 
AtomicReference<>();
+        synchronized (stuckProducerReconnection) {
+            stuckProducerReconnection.set(true);
+            admin.topics().unload(topic);
+            for (int i = 0; i < 10; i++) {
+                ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1);
+                MessageMetadata messageMetadata = new MessageMetadata();
+                messageMetadata.setUncompressedSize(1);
+                MessageImpl<byte[]> message1 = MessageImpl.create(topic, null, 
messageMetadata, payload,
+                        Optional.empty(), null, Schema.BYTES, 0, true, 0);
+                // Mock bugs, which publish messages with 0 numMessagesInBatch.
+                message1.getMessageBuilder().setNumMessagesInBatch(0);
+                latestPublishing.set(producer1.sendAsync(message1));
+            }
+            stuckProducerReconnection.set(false);
+        }
+
+        // Verify: no messages being stuck.
+        latestPublishing.get().get(10, TimeUnit.SECONDS);
+
+        // cleanup.
+        producer1.close();
+        client.close();
+        admin.topics().delete(topic, false);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index bf749ef2b34..589f7ef5ea7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -56,7 +56,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -1781,7 +1780,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
      */
     protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
         @VisibleForTesting
-        final Queue<OpSendMsg> delegate = new ArrayDeque<>();
+        final ArrayDeque<OpSendMsg> delegate = new ArrayDeque<>();
         private int forEachDepth = 0;
         private List<OpSendMsg> postponedOpSendMgs;
         private final AtomicInteger messagesCount = new AtomicInteger(0);
@@ -1836,6 +1835,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return messagesCount.get();
         }
 
+        public int size() {
+            return delegate.size();
+        }
+
         @Override
         public Iterator<OpSendMsg> iterator() {
             Iterator<OpSendMsg> delegateIterator = delegate.iterator();
@@ -2148,7 +2151,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 }
 
                 int messagesToResend = pendingMessages.messagesCount();
-                if (messagesToResend == 0) {
+                if (pendingMessages.size() == 0) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] [{}] No pending messages to resend 
{}", topic, producerName, messagesToResend);
                     }

Reply via email to