This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new dec0a9dbd1f [fix][client]Producer stuck or geo-replication stuck due
to wrong value of message.numMessagesInBatch (#25106)
dec0a9dbd1f is described below
commit dec0a9dbd1fa3a500180a916d7dab9d1fdb6cd42
Author: fengyubiao <[email protected]>
AuthorDate: Wed Dec 24 17:31:45 2025 +0800
[fix][client]Producer stuck or geo-replication stuck due to wrong value of
message.numMessagesInBatch (#25106)
(cherry picked from commit ab65faa12ab7279a726411152af44d81b6a6704b)
---
.../client/api/SimpleProducerConsumerTest.java | 66 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 9 ++-
2 files changed, 72 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index b76328f252f..d5b10fb8347 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -100,6 +100,8 @@ import
org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -111,10 +113,12 @@ import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
@@ -5433,4 +5437,66 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
// cleanup
admin.topics().delete(topic, false);
}
+
+ /**
+ * The internal producer of replicator will resend messages after
reconnected. This test guarantees that the
+ * internal producer will continuously resent messages even though the
client side encounters the following bugs.
+ * - The client side issue causes `message.metadata.numMessagesInBatch`
being `0`, such as
+ * https://github.com/streamnative/pulsar-rs/issues/376.
+ * - Before the fix, the resend mechanism relies on
`message.metadata.numMessagesInBatch`, after the fix, the
+ * producer only care about whether there are pending messages.
+ * see also https://github.com/apache/pulsar/pull/25106.
+ */
+ @Test
+ public void testResendMessagesWhichNumMessagesInBatchIsZero() throws
Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+ final String subscriptionName = "s1";
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().createSubscription(topic, subscriptionName,
MessageId.earliest);
+
+ // Create a producer which can be paused to publish.
+ AtomicBoolean stuckProducerReconnection = new AtomicBoolean(false);
+ ClientBuilderImpl clientBuilder = (ClientBuilderImpl)
PulsarClient.builder().serviceUrl(lookupUrl.toString());
+ PulsarClient client =
InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
+ new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
+ protected void handleProducerSuccess(CommandProducerSuccess
success) {
+ if (stuckProducerReconnection.get()) {
+ synchronized (stuckProducerReconnection) {
+ super.handleProducerSuccess(success);
+ }
+ } else {
+ super.handleProducerSuccess(success);
+ }
+ }
+ });
+ ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>)
client.newProducer().topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .enableBatching(false).create();
+
+ // Trigger a resending by unloading topics.
+ AtomicReference<CompletableFuture<MessageId>> latestPublishing = new
AtomicReference<>();
+ synchronized (stuckProducerReconnection) {
+ stuckProducerReconnection.set(true);
+ admin.topics().unload(topic);
+ for (int i = 0; i < 10; i++) {
+ ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1);
+ MessageMetadata messageMetadata = new MessageMetadata();
+ messageMetadata.setUncompressedSize(1);
+ MessageImpl<byte[]> message1 = MessageImpl.create(topic, null,
messageMetadata, payload,
+ Optional.empty(), null, Schema.BYTES, 0, true, 0);
+ // Mock bugs, which publish messages with 0 numMessagesInBatch.
+ message1.getMessageBuilder().setNumMessagesInBatch(0);
+ latestPublishing.set(producer1.sendAsync(message1));
+ }
+ stuckProducerReconnection.set(false);
+ }
+
+ // Verify: no messages being stuck.
+ latestPublishing.get().get(10, TimeUnit.SECONDS);
+
+ // cleanup.
+ producer1.close();
+ client.close();
+ admin.topics().delete(topic, false);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index bf749ef2b34..589f7ef5ea7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -56,7 +56,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -1781,7 +1780,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
*/
protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
@VisibleForTesting
- final Queue<OpSendMsg> delegate = new ArrayDeque<>();
+ final ArrayDeque<OpSendMsg> delegate = new ArrayDeque<>();
private int forEachDepth = 0;
private List<OpSendMsg> postponedOpSendMgs;
private final AtomicInteger messagesCount = new AtomicInteger(0);
@@ -1836,6 +1835,10 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return messagesCount.get();
}
+ public int size() {
+ return delegate.size();
+ }
+
@Override
public Iterator<OpSendMsg> iterator() {
Iterator<OpSendMsg> delegateIterator = delegate.iterator();
@@ -2148,7 +2151,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
int messagesToResend = pendingMessages.messagesCount();
- if (messagesToResend == 0) {
+ if (pendingMessages.size() == 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] No pending messages to resend
{}", topic, producerName, messagesToResend);
}