This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 31f1ff801fc [improve][client] Test no exception could be thrown for
invalid epoch in message (#25013)
31f1ff801fc is described below
commit 31f1ff801fc6d82ae704350dd47043793ebabc23
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Nov 25 18:24:38 2025 +0800
[improve][client] Test no exception could be thrown for invalid epoch in
message (#25013)
(cherry picked from commit 67dafa19399fd5be28b18b4d320d99cfd93409e1)
---
.../apache/pulsar/client/impl/MockMessageTest.java | 113 +++++++++++++++++++++
.../pulsar/client/util/ExecutorProvider.java | 16 ++-
2 files changed, 127 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MockMessageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MockMessageTest.java
new file mode 100644
index 00000000000..92e598e2446
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MockMessageTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.Unpooled;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class MockMessageTest extends ProducerConsumerBase {
+
+ private final Map<Thread, List<Throwable>> threadFailures = new
ConcurrentHashMap<>();
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testMessageWithWrongEpoch() throws Exception {
+ threadFailures.clear();
+ final var conf = new ClientConfigurationData();
+ conf.setServiceUrl(pulsar.getBrokerServiceUrl());
+ @Cleanup final var client = PulsarClientImpl.builder().conf(conf)
+ .internalExecutorProvider(new ExecutorProvider(1, "internal",
false,
+ this::newThreadFactory))
+ .externalExecutorProvider(new ExecutorProvider(1, "external",
false))
+ .build();
+
+ final var topic = "test-message-with-wrong-epoch";
+ @Cleanup final var consumer = (ConsumerImpl<byte[]>)
client.newConsumer()
+
.topic(topic).subscriptionName("sub").poolMessages(true).subscribe();
+
+ final var cnx = consumer.cnx();
+ consumer.redeliverUnacknowledgedMessages(); // increase the consumer
epoch
+ Assert.assertEquals(consumer.consumerEpoch, 1L);
+ final BiConsumer<Long, String> sendMessage = (epoch, value) -> {
+ cnx.ctx().executor().execute(() -> {
+ final var cmd = new BaseCommand();
+ cmd.copyFrom(Commands.newMessageCommand(consumer.consumerId,
0L, 0L, 0, 0, null, epoch));
+ final var metadata = new
MessageMetadata().setPublishTime(System.currentTimeMillis())
+
.setProducerName("producer").setSequenceId(0).clearNumMessagesInBatch();
+ final var buffer =
Commands.serializeMetadataAndPayload(Commands.ChecksumType.None, metadata,
+ Unpooled.wrappedBuffer(value.getBytes()));
+ cnx.handleMessage(cmd.getMessage(), buffer);
+ });
+ };
+ sendMessage.accept(0L, "msg-0"); // 0 is an old epoch that will be
rejected
+ sendMessage.accept(1L, "msg-1");
+
+ final var msg = consumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg);
+ Assert.assertEquals(msg.getValue(),
"msg-1".getBytes(StandardCharsets.UTF_8));
+ Assert.assertTrue(threadFailures.isEmpty());
+ }
+
+ private ExecutorProvider.ExtendedThreadFactory newThreadFactory(String
poolName, boolean daemon) {
+ return new ExecutorProvider.ExtendedThreadFactory(poolName, daemon) {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ final var thread = super.newThread(r);
+ thread.setUncaughtExceptionHandler((t, e) -> {
+ log.error("Unexpected exception in {}", t.getName(), e);
+ threadFailures.computeIfAbsent(t, __ -> new
CopyOnWriteArrayList<>()).add(e);
+ });
+ return thread;
+ }
+ };
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 88654c51300..1fa0c166707 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.util;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.List;
@@ -27,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
@@ -61,13 +63,23 @@ public class ExecutorProvider {
}
public ExecutorProvider(int numThreads, String poolName) {
+ this(numThreads, poolName, Thread.currentThread().isDaemon());
+ }
+
+ public ExecutorProvider(int numThreads, String poolName, boolean daemon) {
+ this(numThreads, poolName, daemon, ExtendedThreadFactory::new);
+ }
+
+ @VisibleForTesting
+ public ExecutorProvider(
+ int numThreads, String poolName, boolean daemon,
+ BiFunction<String/* poolName */, Boolean/* daemon */,
ExtendedThreadFactory> threadFactoryCreator) {
checkArgument(numThreads > 0);
this.numThreads = numThreads;
Objects.requireNonNull(poolName);
executors = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
- ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
- poolName, Thread.currentThread().isDaemon());
+ ExtendedThreadFactory threadFactory =
threadFactoryCreator.apply(poolName, daemon);
ExecutorService executor = createExecutor(threadFactory);
executors.add(Pair.of(executor, threadFactory));
}