This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2c2b75edf3c [fix][broker] Fix calculate avg message per entry (#17046) 2c2b75edf3c is described below commit 2c2b75edf3c33a1258c070e12e7d175ae83a1c66 Author: Cong Zhao <zhaoc...@apache.org> AuthorDate: Wed Aug 17 09:02:35 2022 +0800 [fix][broker] Fix calculate avg message per entry (#17046) --- .../org/apache/pulsar/broker/service/Consumer.java | 21 ++-- .../service/plugin/EntryFilterProducerTest.java | 62 +++++++++++ .../pulsar/broker/stats/ConsumerStatsTest.java | 124 +++++++++++++++++++-- 3 files changed, 186 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 20f3d3f74d8..5add5829174 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -290,13 +290,16 @@ public class Consumer { return writePromise; } int unackedMessages = totalMessages; - // Note - // Must ensure that the message is written to the pendingAcks before sent is first, because this consumer - // is possible to disconnect at this time. - if (pendingAcks != null) { - for (int i = 0; i < entries.size(); i++) { - Entry entry = entries.get(i); - if (entry != null) { + int totalEntries = 0; + + for (int i = 0; i < entries.size(); i++) { + Entry entry = entries.get(i); + if (entry != null) { + totalEntries++; + // Note + // Must ensure that the message is written to the pendingAcks before sent is first, + // because this consumer is possible to disconnect at this time. + if (pendingAcks != null) { int batchSize = batchSizes.getBatchSize(i); int stickyKeyHash = getStickyKeyHash(entry); long[] ackSet = getCursorAckSet(PositionImpl.get(entry.getLedgerId(), entry.getEntryId())); @@ -317,10 +320,10 @@ public class Consumer { // calculate avg message per entry if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1 // set init value. - avgMessagesPerEntry.set(1.0 * totalMessages / entries.size()); + avgMessagesPerEntry.set(1.0 * totalMessages / totalEntries); } else { avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent - + (1 - avgPercent) * totalMessages / entries.size()); + + (1 - avgPercent) * totalMessages / totalEntries); } // reduce permit and increment unackedMsg count with total number of messages in batch-msgs diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterProducerTest.java new file mode 100644 index 00000000000..5973b7fe54b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterProducerTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.plugin; + + +import java.util.Collections; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.pulsar.broker.service.Consumer; + +@Slf4j +public class EntryFilterProducerTest implements EntryFilter { + @Override + public FilterResult filterEntry(Entry entry, FilterContext context) { + if (context.getMsgMetadata() == null) { + return FilterResult.ACCEPT; + } + Consumer consumer = context.getConsumer(); + Map<String, String> metadata = consumer != null ? consumer.getMetadata() : Collections.emptyMap(); + log.info("filterEntry for {}", metadata); + String matchValueAccept = metadata.getOrDefault("matchValueAccept", "ACCEPT"); + String matchValueReject = metadata.getOrDefault("matchValueReject", "REJECT"); + String matchValueReschedule = metadata.getOrDefault("matchValueReschedule", "RESCHEDULE"); + // filter by string + String producerName = context.getMsgMetadata().getProducerName(); + if (matchValueAccept.equalsIgnoreCase(producerName)) { + log.info("metadata {} producerName {} outcome ACCEPT", metadata, producerName); + return FilterResult.ACCEPT; + } else if (matchValueReject.equalsIgnoreCase(producerName)){ + log.info("metadata {} producerName {} outcome REJECT", metadata, producerName); + return FilterResult.REJECT; + } else if (matchValueReschedule.equalsIgnoreCase(producerName)){ + log.info("metadata {} producerName {} outcome RESCHEDULE", metadata, producerName); + return FilterResult.RESCHEDULE; + } else { + log.info("metadata {} producerName {} outcome ??", metadata, producerName); + } + return null; + } + + @Override + public void close() { + + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 106a40084d1..e845e6e71fd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -18,44 +18,65 @@ */ package org.apache.pulsar.broker.stats; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertNotEquals; +import static org.testng.AssertJUnit.assertEquals; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.plugin.EntryFilter; +import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest; +import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.io.ByteArrayOutputStream; -import java.nio.charset.StandardCharsets; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - @Slf4j @Test(groups = "broker") public class ConsumerStatsTest extends ProducerConsumerBase { @@ -343,4 +364,83 @@ public class ConsumerStatsTest extends ProducerConsumerBase { Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D); } } + + @Override + protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception { + return new PulsarService(conf) { + @Override + protected BrokerService newBrokerService(PulsarService pulsar) throws Exception { + return spy(new BrokerService(this, ioEventLoopGroup)); + } + }; + } + + @Test + public void testAvgMessagesPerEntry() throws Exception { + final String topic = "persistent://public/default/testFilterState"; + String subName = "sub"; + + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .producerName("producer1") + .enableBatching(true).topic(topic) + .batchingMaxMessages(20) + .batchingMaxPublishDelay(5, TimeUnit.SECONDS) + .batchingMaxBytes(Integer.MAX_VALUE) + .create(); + + producer.send("first-message"); + List<CompletableFuture<MessageId>> futures = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + futures.add(producer.sendAsync("message")); + } + FutureUtil.waitForAll(futures); + producer.close(); + + Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING) + .producerName("producer2") + .enableBatching(false).topic(topic) + .create(); + producer2.newMessage().value("producer2-message").send(); + producer2.close(); + + // mock entry filters + NarClassLoader narClassLoader = mock(NarClassLoader.class); + EntryFilter filter = new EntryFilterProducerTest(); + EntryFilterWithClassLoader + loader = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter, + narClassLoader); + ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("filter", loader); + BrokerService brokerService = pulsar.getBrokerService(); + doReturn(entryFilters).when(brokerService).getEntryFilters(); + + Map<String, String> metadataConsumer = new HashMap<>(); + metadataConsumer.put("matchValueAccept", "producer1"); + metadataConsumer.put("matchValueReschedule", "producer2"); + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer) + .subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + + int counter = 0; + while (true) { + Message<String> message = consumer.receive(10, TimeUnit.SECONDS); + if (message != null) { + counter++; + assertNotEquals(message.getValue(), "producer2-message"); + consumer.acknowledge(message); + } else { + break; + } + } + + assertEquals(21, counter); + + ConsumerStats consumerStats = + admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0); + + assertEquals(21, consumerStats.getMsgOutCounter()); + + // Math.round(1 * 0.9 + 0.1 * (20 / 1)) + int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry(); + assertEquals(3, avgMessagesPerEntry); + } }