This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d5ce1cee353 [improve] [broker] Avoid subscription fenced error with
consumer.seek whenever possible (#23163)
d5ce1cee353 is described below
commit d5ce1cee35363ba2372375c2e8740be6d87488d8
Author: fengyubiao <[email protected]>
AuthorDate: Wed Aug 14 16:39:55 2024 +0800
[improve] [broker] Avoid subscription fenced error with consumer.seek
whenever possible (#23163)
---
.../service/persistent/PersistentSubscription.java | 32 +++++++----
.../broker/service/SubscriptionSeekTest.java | 65 ++++++++++++++++++++++
2 files changed, 87 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 0a57f98eb7a..f59ea18ce8e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -132,6 +132,7 @@ public class PersistentSubscription extends
AbstractSubscription {
private final PendingAckHandle pendingAckHandle;
private volatile Map<String, String> subscriptionProperties;
private volatile CompletableFuture<Void> fenceFuture;
+ private volatile CompletableFuture<Void> inProgressResetCursorFuture;
static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
@@ -220,6 +221,16 @@ public class PersistentSubscription extends
AbstractSubscription {
@Override
public CompletableFuture<Void> addConsumer(Consumer consumer) {
+ CompletableFuture<Void> inProgressResetCursorFuture =
this.inProgressResetCursorFuture;
+ if (inProgressResetCursorFuture != null) {
+ return inProgressResetCursorFuture.handle((ignore, ignoreEx) ->
null)
+ .thenCompose(ignore -> addConsumerInternal(consumer));
+ } else {
+ return addConsumerInternal(consumer);
+ }
+ }
+
+ private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
return pendingAckHandle.pendingAckHandleFuture().thenCompose(future ->
{
synchronized (PersistentSubscription.this) {
cursor.updateLastActive();
@@ -775,7 +786,8 @@ public class PersistentSubscription extends
AbstractSubscription {
} else {
finalPosition = position.getNext();
}
- resetCursor(finalPosition, future);
+ CompletableFuture<Void> resetCursorFuture =
resetCursor(finalPosition);
+ FutureUtil.completeAfter(future, resetCursorFuture);
}
@Override
@@ -794,18 +806,13 @@ public class PersistentSubscription extends
AbstractSubscription {
}
@Override
- public CompletableFuture<Void> resetCursor(Position position) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- resetCursor(position, future);
- return future;
- }
-
- private void resetCursor(Position finalPosition, CompletableFuture<Void>
future) {
+ public CompletableFuture<Void> resetCursor(Position finalPosition) {
if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this,
FALSE, TRUE)) {
- future.completeExceptionally(new SubscriptionBusyException("Failed
to fence subscription"));
- return;
+ return CompletableFuture.failedFuture(new
SubscriptionBusyException("Failed to fence subscription"));
}
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ inProgressResetCursorFuture = future;
final CompletableFuture<Void> disconnectFuture;
// Lock the Subscription object before locking the Dispatcher object
to avoid deadlocks
@@ -825,6 +832,7 @@ public class PersistentSubscription extends
AbstractSubscription {
if (throwable != null) {
log.error("[{}][{}] Failed to disconnect consumer from
subscription", topicName, subName, throwable);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
+ inProgressResetCursorFuture = null;
future.completeExceptionally(
new SubscriptionBusyException("Failed to disconnect
consumers from subscription"));
return;
@@ -864,6 +872,7 @@ public class PersistentSubscription extends
AbstractSubscription {
dispatcher.afterAckMessages(null, finalPosition);
}
IS_FENCED_UPDATER.set(PersistentSubscription.this,
FALSE);
+ inProgressResetCursorFuture = null;
future.complete(null);
}
@@ -872,6 +881,7 @@ public class PersistentSubscription extends
AbstractSubscription {
log.error("[{}][{}] Failed to reset subscription to
position {}", topicName, subName,
finalPosition, exception);
IS_FENCED_UPDATER.set(PersistentSubscription.this,
FALSE);
+ inProgressResetCursorFuture = null;
// todo - retry on InvalidCursorPositionException
// or should we just ask user to retry one more time?
if (exception instanceof
InvalidCursorPositionException) {
@@ -886,10 +896,12 @@ public class PersistentSubscription extends
AbstractSubscription {
}).exceptionally((e) -> {
log.error("[{}][{}] Error while resetting cursor", topicName,
subName, e);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
+ inProgressResetCursorFuture = null;
future.completeExceptionally(new BrokerServiceException(e));
return null;
});
});
+ return future;
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index fd08f284bbf..3fc795a8c3e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -34,12 +34,14 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -50,8 +52,13 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
@@ -781,6 +788,64 @@ public class SubscriptionSeekTest extends BrokerTestBase {
assertEquals(count, (msgInTopic1Partition0 + msgInTopic1Partition1 +
msgInTopic1Partition2) * 2);
}
+ @Test
+ public void testSeekWillNotEncounteredFencedError() throws Exception {
+ String topicName = "persistent://prop/ns-abc/my-topic2";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topicPolicies().setRetention(topicName, new
RetentionPolicies(3600, 0));
+ // Create a pulsar client with a subscription fenced counter.
+ ClientBuilderImpl clientBuilder = (ClientBuilderImpl)
PulsarClient.builder().serviceUrl(lookupUrl.toString());
+ AtomicInteger receivedFencedErrorCounter = new AtomicInteger();
+ PulsarClient client =
InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
+ new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
+ protected void handleError(CommandError error) {
+ if (error.getMessage() != null &&
error.getMessage().contains("Subscription is fenced")) {
+ receivedFencedErrorCounter.incrementAndGet();
+ }
+ super.handleError(error);
+ }
+ });
+
+ // publish some messages.
+ org.apache.pulsar.client.api.Consumer<String> consumer =
client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("s1")
+ .subscribe();
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topicName).create();
+ MessageIdImpl msgId1 = (MessageIdImpl) producer.send("0");
+ for (int i = 1; i < 11; i++) {
+ admin.topics().unload(topicName);
+ producer.send(i + "");
+ }
+
+ // Inject a delay for reset-cursor.
+ mockZooKeeper.delay(3000, (op, path) -> {
+ if
(path.equals("/managed-ledgers/prop/ns-abc/persistent/my-topic2/s1")) {
+ return op.toString().equalsIgnoreCase("SET");
+ }
+ return false;
+ });
+
+ // Verify: consumer will not receive "subscription fenced" error after
a seek.
+ for (int i = 1; i < 11; i++) {
+ Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ consumer.acknowledge(msg);
+ }
+ consumer.seek(msgId1);
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(consumer.isConnected());
+ });
+ assertEquals(receivedFencedErrorCounter.get(), 0);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ client.close();
+ admin.topics().delete(topicName);
+ }
+
@Test
public void testExceptionBySeekFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" +
UUID.randomUUID();