This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 6af3edc7d7a [fix][test] Fix flaky
SubscriptionSeekTest.testSeekWillNotEncounteredFencedError by counting
subscription is fenced only after seek (#24865)
6af3edc7d7a is described below
commit 6af3edc7d7afb64ec39aaff985c750f3144e5e97
Author: sinan liu <[email protected]>
AuthorDate: Sat Oct 18 03:22:32 2025 +0800
[fix][test] Fix flaky
SubscriptionSeekTest.testSeekWillNotEncounteredFencedError by counting
subscription is fenced only after seek (#24865)
---
.../apache/pulsar/broker/service/SubscriptionSeekTest.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 9f2b7d257a0..2b9924d1d5b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -38,6 +38,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.Cleanup;
@@ -1045,11 +1046,14 @@ public class SubscriptionSeekTest extends
BrokerTestBase {
// Create a pulsar client with a subscription fenced counter.
ClientBuilderImpl clientBuilder = (ClientBuilderImpl)
PulsarClient.builder().serviceUrl(lookupUrl.toString());
AtomicInteger receivedFencedErrorCounter = new AtomicInteger();
+ // Count switch: Default off, turn on again before seek starts.
+ final AtomicBoolean countAfterSeek = new AtomicBoolean(false);
@Cleanup
PulsarClient client =
InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
protected void handleError(CommandError error) {
- if (error.getMessage() != null &&
error.getMessage().contains("Subscription is fenced")) {
+ if (error.getMessage() != null &&
error.getMessage().contains("Subscription is fenced")
+ && countAfterSeek.get()) {
receivedFencedErrorCounter.incrementAndGet();
}
super.handleError(error);
@@ -1086,10 +1090,9 @@ public class SubscriptionSeekTest extends BrokerTestBase
{
assertNotNull(msg);
consumer.acknowledge(msg);
}
+ countAfterSeek.set(true);
consumer.seek(msgId1);
- Awaitility.await().untilAsserted(() -> {
- assertTrue(consumer.isConnected());
- });
+ Awaitility.await().untilAsserted(() ->
assertTrue(consumer.isConnected()));
assertEquals(receivedFencedErrorCounter.get(), 0);
// cleanup.