codelipenghui commented on code in PR #17318:
URL: https://github.com/apache/pulsar/pull/17318#discussion_r990778655
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java:
##########
@@ -193,4 +194,42 @@ public void
testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
}
+
+ @Test
+ public void testBatchReceiveAckTimeout()
+ throws PulsarAdminException, PulsarClientException,
InterruptedException {
+ String topicName = newTopicName();
+ int numPartitions = 2;
+ int numMessages = 100000;
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+ Producer<Long> producer = pulsarClient.newProducer(Schema.INT64)
Review Comment:
@Cleanup
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java:
##########
@@ -193,4 +194,42 @@ public void
testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
}
+
+ @Test
+ public void testBatchReceiveAckTimeout()
+ throws PulsarAdminException, PulsarClientException,
InterruptedException {
+ String topicName = newTopicName();
+ int numPartitions = 2;
+ int numMessages = 100000;
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+ Producer<Long> producer = pulsarClient.newProducer(Schema.INT64)
+ .topic(topicName)
+ .enableBatching(false)
+ .blockIfQueueFull(true)
+ .create();
+
+ @Cleanup
+ Consumer<Long> consumer = pulsarClient
+ .newConsumer(Schema.INT64)
+ .topic(topicName)
+ .receiverQueueSize(numMessages)
+ .batchReceivePolicy(
+
BatchReceivePolicy.builder().maxNumMessages(1).timeout(2,
TimeUnit.SECONDS).build()
+ ).ackTimeout(1000, TimeUnit.MILLISECONDS)
+ .subscriptionName(methodName)
+ .subscribe();
+
+ producer.newMessage()
+ .value(1l)
+ .send();
+ Thread.sleep(300);
Review Comment:
Why do we need `sleep` here?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1890,64 +1858,62 @@ public int numMessagesInQueue() {
return incomingMessages.size();
}
- @Override
- public void redeliverUnacknowledgedMessages() {
- // First : synchronized in order to handle consumer reconnect produce
race condition, when broker receive
- // redeliverUnacknowledgedMessages and consumer have not be created and
- // then receive reconnect epoch change the broker is smaller than the
client epoch, this will cause client epoch
- // smaller than broker epoch forever. client will not receive message
anymore.
- // Second : we should synchronized `ClientCnx cnx = cnx()` to
- // prevent use old cnx to send redeliverUnacknowledgedMessages to a
old broker
- synchronized (ConsumerImpl.this) {
- ClientCnx cnx = cnx();
- // V1 don't support redeliverUnacknowledgedMessages
- if (cnx != null && cnx.getRemoteEndpointProtocolVersion() <
ProtocolVersion.v2.getValue()) {
- if ((getState() == State.Connecting)) {
- log.warn("[{}] Client Connection needs to be established "
- + "for redelivery of unacknowledged messages",
this);
- } else {
- log.warn("[{}] Reconnecting the client to redeliver the
messages.", this);
- cnx.ctx().close();
- }
+ public CompletableFuture<Void> internalRedeliverUnacknowledgedMessages() {
+ return CompletableFuture.runAsync(() -> {
+ // First : synchronized in order to handle consumer reconnect
produce race condition, when broker receive
+ // redeliverUnacknowledgedMessages and consumer have not be
created and then receive reconnect epoch
+ // change the broker is smaller than the client epoch, this will
cause client epoch smaller
+ // than broker epoch forever. client will not receive message
anymore.
+ // Second : we should synchronized `ClientCnx cnx = cnx()` to
prevent use old cnx to
+ // send redeliverUnacknowledgedMessages to a old broker
+ synchronized (ConsumerImpl.this) {
+ ClientCnx cnx = cnx();
+ // V1 don't support redeliverUnacknowledgedMessages
+ if (cnx != null && cnx.getRemoteEndpointProtocolVersion() <
ProtocolVersion.v2.getValue()) {
+ if ((getState() == State.Connecting)) {
+ log.warn("[{}] Client Connection needs to be
established "
+ + "for redelivery of unacknowledged messages",
this);
+ } else {
+ log.warn("[{}] Reconnecting the client to redeliver
the messages.", this);
+ cnx.ctx().close();
+ }
- return;
- }
+ return;
+ }
- // clear local message
- int currentSize = 0;
- currentSize = incomingMessages.size();
- clearIncomingMessages();
- unAckedMessageTracker.clear();
-
- // we should increase epoch every time, because
MultiTopicsConsumerImpl also increase it,
- // we need to keep both epochs the same
- if (conf.getSubscriptionType() == SubscriptionType.Failover
- || conf.getSubscriptionType() ==
SubscriptionType.Exclusive) {
- CONSUMER_EPOCH.incrementAndGet(this);
- }
- // is channel is connected, we should send redeliver command to
broker
- if (cnx != null && isConnected(cnx)) {
-
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
- consumerId, CONSUMER_EPOCH.get(this)),
cnx.ctx().voidPromise());
- if (currentSize > 0) {
- increaseAvailablePermits(cnx, currentSize);
+ // we should increase epoch every time, because
MultiTopicsConsumerImpl also increase it,
+ // we need to keep both epochs the same
+ if (conf.getSubscriptionType() == SubscriptionType.Failover
+ || conf.getSubscriptionType() ==
SubscriptionType.Exclusive) {
+ CONSUMER_EPOCH.incrementAndGet(this);
}
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] [{}] Redeliver unacked messages and
send {} permits", subscription, topic,
- consumerName, currentSize);
+ // clear local message
+ int currentSize = incomingMessages.size();
+ clearIncomingMessages();
+ unAckedMessageTracker.clear();
+ // is channel is connected, we should send redeliver command
to broker
+ if (cnx != null && isConnected(cnx)) {
+
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
+ consumerId, CONSUMER_EPOCH.get(this)),
cnx.ctx().voidPromise());
+ if (currentSize > 0) {
+ increaseAvailablePermits(cnx, currentSize);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] [{}] Redeliver unacked messages
and send {} permits", subscription, topic,
+ consumerName, currentSize);
+ }
+ } else {
+ log.warn("[{}] Send redeliver messages command but the
client is reconnect or close, "
+ + "so don't need to send redeliver command to
broker", this);
}
- } else {
- log.warn("[{}] Send redeliver messages command but the client
is reconnect or close, "
- + "so don't need to send redeliver command to broker",
this);
}
- }
+ }, internalPinnedExecutor);
}
- public int clearIncomingMessagesAndGetMessageNumber() {
- int messagesNumber = incomingMessages.size();
- clearIncomingMessages();
- unAckedMessageTracker.clear();
- return messagesNumber;
+ @SneakyThrows
+ @Override
+ public void redeliverUnacknowledgedMessages() {
+ internalRedeliverUnacknowledgedMessages().get();
Review Comment:
We should unwrap the `ExecutionException` that is introduced by the `get()`
operation.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -695,17 +656,20 @@ private ConsumerConfigurationData<T>
getInternalConsumerConfig() {
return internalConsumerConfig;
}
+ @SneakyThrows
@Override
public void redeliverUnacknowledgedMessages() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
internalPinnedExecutor.execute(() -> {
CONSUMER_EPOCH.incrementAndGet(this);
consumers.values().stream().forEach(consumer -> {
- consumer.redeliverUnacknowledgedMessages();
+
futures.add(consumer.internalRedeliverUnacknowledgedMessages());
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
clearIncomingMessages();
unAckedMessageTracker.clear();
});
+ FutureUtil.waitForAll(futures).get();
Review Comment:
We should unwrap the `ExecutionException` that is introduced by the `get()`
operation.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -695,17 +656,20 @@ private ConsumerConfigurationData<T>
getInternalConsumerConfig() {
return internalConsumerConfig;
}
+ @SneakyThrows
@Override
public void redeliverUnacknowledgedMessages() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
Review Comment:
```suggestion
List<CompletableFuture<Void>> futures = new
ArrayList<>(consumers.size());
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java:
##########
@@ -309,21 +303,110 @@ public void testRedeliveryAddEpoch(boolean enableBatch)
throws Exception{
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
- Field field =
consumer.getClass().getDeclaredField("connectionHandler");
- field.setAccessible(true);
- ConnectionHandler connectionHandler = (ConnectionHandler)
field.get(consumer);
-
- field =
connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
- field.setAccessible(true);
-
+ ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();
- ((ConsumerImpl<String>) consumer).grabCnx();
+ consumer.grabCnx();
+
message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(message.getValue(), test3);
}
+ @Test(dataProvider = "enableBatch")
+ public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws
Exception {
+ final String topic = "testRedeliveryAddEpochAndPermits";
+ final String subName = "my-sub";
+ // set receive queue size is 4, and first send 4 messages,
+ // then call redeliver messages, assert receive msg num.
+ int receiveQueueSize = 4;
+ ConsumerImpl<String> consumer = ((ConsumerImpl<String>)
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .receiverQueueSize(receiveQueueSize)
+ .autoScaledReceiverQueueSizeEnabled(false)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe());
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(enableBatch)
+ .create();
+
+ consumer.setConsumerEpoch(1);
+ for (int i = 0; i < receiveQueueSize; i++) {
+ producer.send("pulsar" + i);
+ }
+ assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+ consumer.redeliverUnacknowledgedMessages();
+ for (int i = 0; i < receiveQueueSize; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals("pulsar" + i, msg.getValue());
+ }
+ }
+
+ @Test(dataProvider = "enableBatch")
+ public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws
Exception{
Review Comment:
Looks like we missed the consumer epoch test for the partitioned topic?
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java:
##########
@@ -309,21 +303,110 @@ public void testRedeliveryAddEpoch(boolean enableBatch)
throws Exception{
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
- Field field =
consumer.getClass().getDeclaredField("connectionHandler");
- field.setAccessible(true);
- ConnectionHandler connectionHandler = (ConnectionHandler)
field.get(consumer);
-
- field =
connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
- field.setAccessible(true);
-
+ ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();
- ((ConsumerImpl<String>) consumer).grabCnx();
+ consumer.grabCnx();
+
message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(message.getValue(), test3);
}
+ @Test(dataProvider = "enableBatch")
+ public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws
Exception {
+ final String topic = "testRedeliveryAddEpochAndPermits";
+ final String subName = "my-sub";
+ // set receive queue size is 4, and first send 4 messages,
+ // then call redeliver messages, assert receive msg num.
+ int receiveQueueSize = 4;
+ ConsumerImpl<String> consumer = ((ConsumerImpl<String>)
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .receiverQueueSize(receiveQueueSize)
+ .autoScaledReceiverQueueSizeEnabled(false)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe());
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(enableBatch)
+ .create();
+
+ consumer.setConsumerEpoch(1);
+ for (int i = 0; i < receiveQueueSize; i++) {
+ producer.send("pulsar" + i);
+ }
+ assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+ consumer.redeliverUnacknowledgedMessages();
+ for (int i = 0; i < receiveQueueSize; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals("pulsar" + i, msg.getValue());
+ }
+ }
+
+ @Test(dataProvider = "enableBatch")
+ public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws
Exception{
+ final String topic = "testBatchReceiveRedeliveryAddEpoch";
+ final String subName = "my-sub";
+ ConsumerImpl<String> consumer = ((ConsumerImpl<String>)
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subName)
+ .batchReceivePolicy(BatchReceivePolicy.builder().timeout(1000,
TimeUnit.MILLISECONDS).build())
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe());
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(enableBatch)
+ .create();
Review Comment:
Close the producer and consumer after done the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]