Denovo1998 commented on code in PR #25189:
URL: https://github.com/apache/pulsar/pull/25189#discussion_r2741532283
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java:
##########
@@ -272,35 +273,41 @@ private void internalPublishMessages(TopicName topicName,
ProducerMessages reque
private CompletableFuture<Position> publishSingleMessageToPartition(String
topic, Message message) {
CompletableFuture<Position> publishResult = new CompletableFuture<>();
pulsar().getBrokerService().getTopic(topic, false)
- .thenAccept(t -> {
- // TODO: Check message backlog and fail if backlog too large.
- if (!t.isPresent()) {
- // Topic not found, and remove from owning partition list.
- publishResult.completeExceptionally(new
BrokerServiceException.TopicNotFoundException("Topic not "
- + "owned by current broker."));
- TopicName topicName = TopicName.get(topic);
-
pulsar().getBrokerService().getOwningTopics().get(topicName.getPartitionedTopicName())
- .remove(topicName.getPartitionIndex());
- } else {
- try {
- ByteBuf headersAndPayload = messageToByteBuf(message);
- try {
- Topic topicObj = t.get();
- topicObj.publishMessage(headersAndPayload,
- RestMessagePublishContext.get(publishResult,
topicObj, System.nanoTime()));
- } finally {
- headersAndPayload.release();
+ .thenCompose(tOpt -> {
+ if (tOpt.isEmpty()) {
+ publishResult.completeExceptionally(
+ new
BrokerServiceException.TopicNotFoundException("Topic not "
+ + "owned by current broker."));
+ TopicName tn = TopicName.get(topic);
+
pulsar().getBrokerService().getOwningTopics().get(tn.getPartitionedTopicName())
+ .remove(tn.getPartitionIndex());
+ return CompletableFuture.completedFuture(null);
+ }
+ Topic topicObj = tOpt.get();
+ CompletableFuture<Void> backlogQuotaCheckFuture =
CompletableFuture.allOf(
+
topicObj.checkBacklogQuotaExceeded(defaultProducerName,
+
BacklogQuota.BacklogQuotaType.destination_storage),
+
topicObj.checkBacklogQuotaExceeded(defaultProducerName,
+
BacklogQuota.BacklogQuotaType.message_age));
Review Comment:
Should we pass in message.getProducerName(); here?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java:
##########
@@ -931,4 +933,80 @@ public void testProduceWithAutoConsumeSchema() throws
Exception {
}
}
+ @Test
+ public void testProduceWithBacklogQuotaSizeExceeded() throws Exception {
+ String namespaceName = testTenant + "/" + testNamespace;
+ String topicName = "persistent://" + namespaceName + "/" +
testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ BacklogQuota backlogQuota = BacklogQuota.builder()
+ .limitSize(0)
+
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+ .build();
+ admin.namespaces().setBacklogQuota(namespaceName, backlogQuota,
+ BacklogQuota.BacklogQuotaType.destination_storage);
+
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ ProducerMessages producerMessages = new ProducerMessages();
+ String message = "[{\"payload\":\"rest-produce\"}]";
+ producerMessages.setMessages(createMessages(message));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName,
+ false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+ Object responseEntity = responseCaptor.getValue().getEntity();
+ Assert.assertTrue(responseEntity instanceof ProducerAcks);
+ ProducerAcks response = (ProducerAcks) responseEntity;
+ Assert.assertEquals(response.getMessagePublishResults().size(), 1);
+ for (int index = 0; index <
response.getMessagePublishResults().size(); index++) {
+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(),
2);
+
Assert.assertTrue(response.getMessagePublishResults().get(index).getErrorMsg()
+ .contains("Cannot create producer on topic with backlog
quota exceeded"));
+ }
+ }
+
+ @Test
+ public void testProduceWithBacklogQuotaTimeExceeded() throws Exception {
+ pulsar.getConfiguration().setPreciseTimeBasedBacklogQuotaCheck(true);
+ String namespaceName = testTenant + "/" + testNamespace;
+ String topicName = "persistent://" + namespaceName + "/" +
testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ BacklogQuota backlogQuota = BacklogQuota.builder()
+ .limitTime(1)
+
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+ .build();
+ admin.namespaces().setBacklogQuota(namespaceName, backlogQuota,
+ BacklogQuota.BacklogQuotaType.message_age);
+ admin.topics().createSubscription(topicName, "time-quota-sub",
MessageId.earliest);
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+ producer.send("backlog-message");
+ Thread.sleep(2000);
Review Comment:
Perhaps it can be replaced with a "poll until quota exceeded (with timeout)"
method.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java:
##########
@@ -272,35 +273,41 @@ private void internalPublishMessages(TopicName topicName,
ProducerMessages reque
private CompletableFuture<Position> publishSingleMessageToPartition(String
topic, Message message) {
CompletableFuture<Position> publishResult = new CompletableFuture<>();
pulsar().getBrokerService().getTopic(topic, false)
- .thenAccept(t -> {
- // TODO: Check message backlog and fail if backlog too large.
- if (!t.isPresent()) {
- // Topic not found, and remove from owning partition list.
- publishResult.completeExceptionally(new
BrokerServiceException.TopicNotFoundException("Topic not "
- + "owned by current broker."));
- TopicName topicName = TopicName.get(topic);
-
pulsar().getBrokerService().getOwningTopics().get(topicName.getPartitionedTopicName())
- .remove(topicName.getPartitionIndex());
- } else {
- try {
- ByteBuf headersAndPayload = messageToByteBuf(message);
- try {
- Topic topicObj = t.get();
- topicObj.publishMessage(headersAndPayload,
- RestMessagePublishContext.get(publishResult,
topicObj, System.nanoTime()));
- } finally {
- headersAndPayload.release();
+ .thenCompose(tOpt -> {
+ if (tOpt.isEmpty()) {
+ publishResult.completeExceptionally(
+ new
BrokerServiceException.TopicNotFoundException("Topic not "
+ + "owned by current broker."));
+ TopicName tn = TopicName.get(topic);
+
pulsar().getBrokerService().getOwningTopics().get(tn.getPartitionedTopicName())
+ .remove(tn.getPartitionIndex());
+ return CompletableFuture.completedFuture(null);
+ }
+ Topic topicObj = tOpt.get();
+ CompletableFuture<Void> backlogQuotaCheckFuture =
CompletableFuture.allOf(
+
topicObj.checkBacklogQuotaExceeded(defaultProducerName,
+
BacklogQuota.BacklogQuotaType.destination_storage),
+
topicObj.checkBacklogQuotaExceeded(defaultProducerName,
+
BacklogQuota.BacklogQuotaType.message_age));
+ return backlogQuotaCheckFuture.thenRun(() -> {
+ ByteBuf headersAndPayload = messageToByteBuf(message);
+ try {
+ topicObj.publishMessage(headersAndPayload,
+
RestMessagePublishContext.get(publishResult, topicObj, System.nanoTime()));
+ } finally {
+ headersAndPayload.release();
+ }
+ });
+ })
+ .exceptionally(ex -> {
+ if (!publishResult.isDone()) {
+
publishResult.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
}
- } catch (Exception e) {
if (log.isDebugEnabled()) {
- log.debug("Fail to publish single messages to topic
{}: {} ",
- topicName, e.getCause());
+ log.debug("Fail to publish single message to topic {}:
{}", topic, ex.getMessage());
Review Comment:
Here you can first use FutureUtil.unwrapCompletionException.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]