This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new acac72ea03f [improve][broker][PIP-379] Add observability stats for
"draining hashes" (#23429)
acac72ea03f is described below
commit acac72ea03f7c38cab99ec011b309d5e6bb4fe9d
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Oct 10 12:46:42 2024 +0300
[improve][broker][PIP-379] Add observability stats for "draining hashes"
(#23429)
---
.../org/apache/pulsar/broker/service/Consumer.java | 7 +
.../broker/service/DrainingHashesTracker.java | 112 +++++++++
...istentStickyKeyDispatcherMultipleConsumers.java | 6 +-
.../service/persistent/PersistentSubscription.java | 20 +-
.../service/persistent/RescheduleReadHandler.java | 14 ++
.../org/apache/pulsar/broker/BrokerTestUtil.java | 78 +++++-
.../stats/AuthenticatedConsumerStatsTest.java | 57 +----
.../pulsar/broker/stats/ConsumerStatsTest.java | 276 +++++++++++++++++++--
pulsar-broker/src/test/resources/log4j2.xml | 11 +
.../pulsar/common/policies/data/ConsumerStats.java | 48 +++-
.../pulsar/common/policies/data/DrainingHash.java | 41 +++
.../common/policies/data/SubscriptionStats.java | 24 ++
.../policies/data/stats/ConsumerStatsImpl.java | 43 +++-
.../policies/data/stats/DrainingHashImpl.java | 46 ++++
.../policies/data/stats/SubscriptionStatsImpl.java | 22 ++
.../pulsar/common/util/ObjectMapperFactory.java | 3 +
16 files changed, 734 insertions(+), 74 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index d25ebd0839d..bcd29d86490 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -174,6 +174,10 @@ public class Consumer {
@Setter
private volatile PendingAcksMap.PendingAcksRemoveHandler
pendingAcksRemoveHandler;
+ @Getter
+ @Setter
+ private volatile java.util.function.BiConsumer<Consumer,
ConsumerStatsImpl> drainingHashesConsumerStatsUpdater;
+
public Consumer(Subscription subscription, SubType subType, String
topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
@@ -976,6 +980,9 @@ public class Consumer {
if (readPositionWhenJoining != null) {
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
}
+ if (drainingHashesConsumerStatsUpdater != null) {
+ drainingHashesConsumerStatsUpdater.accept(this, stats);
+ }
return stats;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
index 3521fa197a1..46762c844db 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
@@ -20,8 +20,18 @@ package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.PrimitiveIterator;
+import java.util.concurrent.ConcurrentHashMap;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.policies.data.DrainingHash;
+import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl;
+import org.roaringbitmap.RoaringBitmap;
/**
* A thread-safe map to store draining hashes in the consumer.
@@ -34,6 +44,8 @@ public class DrainingHashesTracker {
private final Int2ObjectOpenHashMap<DrainingHashEntry> drainingHashes =
new Int2ObjectOpenHashMap<>();
int batchLevel;
boolean unblockedWhileBatching;
+ private final Map<ConsumerIdentityWrapper, ConsumerDrainingHashesStats>
consumerDrainingHashesStatsMap =
+ new ConcurrentHashMap<>();
/**
* Represents an entry in the draining hashes tracker.
@@ -98,6 +110,52 @@ public class DrainingHashesTracker {
}
}
+ private class ConsumerDrainingHashesStats {
+ private final RoaringBitmap drainingHashes = new RoaringBitmap();
+ long drainingHashesClearedTotal;
+
+ public synchronized void addHash(int stickyHash) {
+ drainingHashes.add(stickyHash);
+ }
+
+ public synchronized boolean clearHash(int hash) {
+ drainingHashes.remove(hash);
+ drainingHashesClearedTotal++;
+ boolean empty = drainingHashes.isEmpty();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Cleared hash {} in stats. empty={}
totalCleared={} hashes={}",
+ dispatcherName, hash, empty,
drainingHashesClearedTotal, drainingHashes.getCardinality());
+ }
+ return empty;
+ }
+
+ public synchronized void updateConsumerStats(Consumer consumer,
ConsumerStatsImpl consumerStats) {
+ int drainingHashesUnackedMessages = 0;
+ List<DrainingHash> drainingHashesStats = new ArrayList<>();
+ PrimitiveIterator.OfInt hashIterator =
drainingHashes.stream().iterator();
+ while (hashIterator.hasNext()) {
+ int hash = hashIterator.nextInt();
+ DrainingHashEntry entry = getEntry(hash);
+ if (entry == null) {
+ log.warn("[{}] Draining hash {} not found in the tracker
for consumer {}", dispatcherName, hash,
+ consumer);
+ continue;
+ }
+ int unackedMessages = entry.refCount;
+ DrainingHashImpl drainingHash = new DrainingHashImpl();
+ drainingHash.hash = hash;
+ drainingHash.unackMsgs = unackedMessages;
+ drainingHash.blockedAttempts = entry.blockedCount;
+ drainingHashesStats.add(drainingHash);
+ drainingHashesUnackedMessages += unackedMessages;
+ }
+ consumerStats.drainingHashesCount = drainingHashesStats.size();
+ consumerStats.drainingHashesClearedTotal =
drainingHashesClearedTotal;
+ consumerStats.drainingHashesUnackedMessages =
drainingHashesUnackedMessages;
+ consumerStats.drainingHashes = drainingHashesStats;
+ }
+ }
+
/**
* Interface for handling the unblocking of sticky key hashes.
*/
@@ -127,13 +185,25 @@ public class DrainingHashesTracker {
}
DrainingHashEntry entry = drainingHashes.get(stickyHash);
if (entry == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Adding and incrementing draining hash {} for
consumer id:{} name:{}", dispatcherName,
+ stickyHash, consumer.consumerId(),
consumer.consumerName());
+ }
entry = new DrainingHashEntry(consumer);
drainingHashes.put(stickyHash, entry);
+ // update the consumer specific stats
+ consumerDrainingHashesStatsMap.computeIfAbsent(new
ConsumerIdentityWrapper(consumer),
+ k -> new
ConsumerDrainingHashesStats()).addHash(stickyHash);
} else if (entry.getConsumer() != consumer) {
throw new IllegalStateException(
"Consumer " + entry.getConsumer() + " is already draining
hash " + stickyHash
+ " in dispatcher " + dispatcherName + ". Same
hash being used for consumer " + consumer
+ ".");
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Draining hash {} incrementing {} consumer
id:{} name:{}", dispatcherName, stickyHash,
+ entry.refCount + 1, consumer.consumerId(),
consumer.consumerName());
+ }
}
entry.incrementRefCount();
}
@@ -178,7 +248,17 @@ public class DrainingHashesTracker {
+ ".");
}
if (entry.decrementRefCount()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Draining hash {} removing consumer id:{}
name:{}", dispatcherName, stickyHash,
+ consumer.consumerId(), consumer.consumerName());
+ }
DrainingHashEntry removed = drainingHashes.remove(stickyHash);
+ // update the consumer specific stats
+ ConsumerDrainingHashesStats drainingHashesStats =
+ consumerDrainingHashesStatsMap.get(new
ConsumerIdentityWrapper(consumer));
+ if (drainingHashesStats != null) {
+ drainingHashesStats.clearHash(stickyHash);
+ }
if (!closing && removed.isBlocking()) {
if (batchLevel > 0) {
unblockedWhileBatching = true;
@@ -186,6 +266,11 @@ public class DrainingHashesTracker {
unblockingHandler.stickyKeyHashUnblocked(stickyHash);
}
}
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Draining hash {} decrementing {} consumer
id:{} name:{}", dispatcherName, stickyHash,
+ entry.refCount, consumer.consumerId(),
consumer.consumerName());
+ }
}
}
@@ -237,5 +322,32 @@ public class DrainingHashesTracker {
*/
public synchronized void clear() {
drainingHashes.clear();
+ consumerDrainingHashesStatsMap.clear();
+ }
+
+ /**
+ * Update the consumer specific stats to the target {@link
ConsumerStatsImpl}.
+ *
+ * @param consumer the consumer
+ * @param consumerStats the consumer stats to update the values to
+ */
+ public void updateConsumerStats(Consumer consumer, ConsumerStatsImpl
consumerStats) {
+ consumerStats.drainingHashesCount = 0;
+ consumerStats.drainingHashesClearedTotal = 0;
+ consumerStats.drainingHashesUnackedMessages = 0;
+ consumerStats.drainingHashes = Collections.emptyList();
+ ConsumerDrainingHashesStats consumerDrainingHashesStats =
+ consumerDrainingHashesStatsMap.get(new
ConsumerIdentityWrapper(consumer));
+ if (consumerDrainingHashesStats != null) {
+ consumerDrainingHashesStats.updateConsumerStats(consumer,
consumerStats);
+ }
+ }
+
+ /**
+ * Remove the consumer specific stats from the draining hashes tracker.
+ * @param consumer the consumer
+ */
+ public void consumerRemoved(Consumer consumer) {
+ consumerDrainingHashesStatsMap.remove(new
ConsumerIdentityWrapper(consumer));
}
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index df053e6d8a5..1a3e2f706cb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -157,6 +157,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
drainingHashesTracker.endBatch();
}
});
+
consumer.setDrainingHashesConsumerStatsUpdater(drainingHashesTracker::updateConsumerStats);
registerDrainingHashes(consumer,
impactedConsumers.orElseThrow());
}
}).exceptionally(ex -> {
@@ -193,6 +194,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
// consumer to another. This will handle the case where a hash
gets switched from an existing
// consumer to another existing consumer during removal.
registerDrainingHashes(consumer, impactedConsumers.orElseThrow());
+ drainingHashesTracker.consumerRemoved(consumer);
}
}
@@ -349,8 +351,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
return false;
}
if (log.isDebugEnabled()) {
- log.debug("[{}] Adding {}:{} to pending acks for consumer {} with
sticky key hash {}",
- getName(), ledgerId, entryId, consumer, stickyKeyHash);
+ log.debug("[{}] Adding {}:{} to pending acks for consumer id:{}
name:{} with sticky key hash {}",
+ getName(), ledgerId, entryId, consumer.consumerId(),
consumer.consumerName(), stickyKeyHash);
}
// allow adding the message to pending acks and sending the message to
the consumer
return true;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index eaa147b81b1..df1c23cbbcb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1253,11 +1253,23 @@ public class PersistentSubscription extends
AbstractSubscription {
subStats.lastConsumedTimestamp =
Math.max(subStats.lastConsumedTimestamp,
consumerStats.lastConsumedTimestamp);
subStats.lastAckedTimestamp =
Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
- if (consumerKeyHashRanges != null &&
consumerKeyHashRanges.containsKey(consumer)) {
- consumerStats.keyHashRanges =
consumerKeyHashRanges.get(consumer).stream()
- .map(Range::toString)
- .collect(Collectors.toList());
+ List<Range> keyRanges = consumerKeyHashRanges != null ?
consumerKeyHashRanges.get(consumer) : null;
+ if (keyRanges != null) {
+ if (((StickyKeyDispatcher) dispatcher).isClassic()) {
+ // Use string representation for classic mode
+ consumerStats.keyHashRanges = keyRanges.stream()
+ .map(Range::toString)
+ .collect(Collectors.toList());
+ } else {
+ // Use array representation for PIP-379 stats
+ consumerStats.keyHashRangeArrays = keyRanges.stream()
+ .map(range -> new int[]{range.getStart(),
range.getEnd()})
+ .collect(Collectors.toList());
+ }
}
+ subStats.drainingHashesCount +=
consumerStats.drainingHashesCount;
+ subStats.drainingHashesClearedTotal +=
consumerStats.drainingHashesClearedTotal;
+ subStats.drainingHashesUnackedMessages +=
consumerStats.drainingHashesUnackedMessages;
});
subStats.filterProcessedMsgCount =
dispatcher.getFilterProcessedMsgCount();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java
index 3554f292552..4812be58cdc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
+import lombok.extern.slf4j.Slf4j;
/**
* Reschedules reads so that the possible pending read is cancelled if it's
waiting for more entries.
@@ -30,6 +31,7 @@ import java.util.function.LongSupplier;
* that should be handled. This will also batch multiple calls together to
reduce the number of
* operations.
*/
+@Slf4j
class RescheduleReadHandler {
private static final int UNSET = -1;
private static final int NO_PENDING_READ = 0;
@@ -70,15 +72,27 @@ class RescheduleReadHandler {
// are entries in the replay queue.
if (maxReadOpCount != NO_PENDING_READ &&
readOpCounterSupplier.getAsLong() == maxReadOpCount
&& hasEntriesInReplayQueue.getAsBoolean()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Cancelling pending read request because
it's waiting for more entries");
+ }
cancelPendingRead.run();
}
// Re-schedule read immediately, or join the next scheduled
read
+ if (log.isDebugEnabled()) {
+ log.debug("Triggering read");
+ }
rescheduleReadImmediately.run();
};
long rescheduleDelay = readIntervalMsSupplier.getAsLong();
if (rescheduleDelay > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Scheduling after {} ms", rescheduleDelay);
+ }
executor.schedule(runnable, rescheduleDelay,
TimeUnit.MILLISECONDS);
} else {
+ if (log.isDebugEnabled()) {
+ log.debug("Running immediately");
+ }
runnable.run();
}
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
index 7ed4542b250..6a41e86f893 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -21,10 +21,15 @@ package org.apache.pulsar.broker;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.StringWriter;
import java.io.UncheckedIOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
@@ -37,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Stream;
+import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -46,7 +52,6 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.Mockito;
import org.slf4j.Logger;
-
/**
* Holds util methods used in test.
*/
@@ -136,6 +141,77 @@ public class BrokerTestUtil {
}
}
+ /**
+ * Logs the topic stats and internal stats for the given topic
+ * @param logger logger to use
+ * @param baseUrl Pulsar service URL
+ * @param topic topic name
+ */
+ public static void logTopicStats(Logger logger, String baseUrl, String
topic) {
+ logTopicStats(logger, baseUrl, "public", "default", topic);
+ }
+
+ /**
+ * Logs the topic stats and internal stats for the given topic
+ * @param logger logger to use
+ * @param baseUrl Pulsar service URL
+ * @param tenant tenant name
+ * @param namespace namespace name
+ * @param topic topic name
+ */
+ public static void logTopicStats(Logger logger, String baseUrl, String
tenant, String namespace, String topic) {
+ String topicStatsUri =
+ String.format("%s/admin/v2/persistent/%s/%s/%s/stats",
baseUrl, tenant, namespace, topic);
+ logger.info("[{}] stats: {}", topic,
jsonPrettyPrint(getJsonResourceAsString(topicStatsUri)));
+ String topicStatsInternalUri =
+ String.format("%s/admin/v2/persistent/%s/%s/%s/internalStats",
baseUrl, tenant, namespace, topic);
+ logger.info("[{}] internalStats: {}", topic,
jsonPrettyPrint(getJsonResourceAsString(topicStatsInternalUri)));
+ }
+
+ /**
+ * Pretty print the given JSON string
+ * @param jsonString JSON string to pretty print
+ * @return pretty printed JSON string
+ */
+ public static String jsonPrettyPrint(String jsonString) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ Object json = mapper.readValue(jsonString, Object.class);
+ ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
+ return writer.writeValueAsString(json);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /**
+ * Get the resource as a string from the given URI
+ */
+ @SneakyThrows
+ public static String getJsonResourceAsString(String uri) {
+ URL url = new URL(uri);
+ HttpURLConnection connection = (HttpURLConnection)
url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.setRequestProperty("Accept", "application/json");
+ try {
+ int responseCode = connection.getResponseCode();
+ if (responseCode == 200) {
+ try (BufferedReader in = new BufferedReader(new
InputStreamReader(connection.getInputStream()))) {
+ String inputLine;
+ StringBuilder content = new StringBuilder();
+ while ((inputLine = in.readLine()) != null) {
+ content.append(inputLine);
+ }
+ return content.toString();
+ }
+ } else {
+ throw new IOException("Failed to get resource: " + uri + ",
status: " + responseCode);
+ }
+ } finally {
+ connection.disconnect();
+ }
+ }
+
/**
* Receive messages concurrently from multiple consumers and handles them
using the provided message handler.
* The message handler should return true if it wants to continue
receiving more messages, false otherwise.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
index e8cadb72e1e..20c1c5498ce 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
@@ -18,11 +18,19 @@
*/
package org.apache.pulsar.broker.stats;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Sets;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -37,18 +45,6 @@ import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.security.KeyPair;
-import java.security.KeyPairGenerator;
-import java.security.NoSuchAlgorithmException;
-import java.security.PrivateKey;
-import java.time.Duration;
-import java.util.Base64;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-
public class AuthenticatedConsumerStatsTest extends ConsumerStatsTest{
private final String ADMIN_TOKEN;
private final String TOKEN_PUBLIC_KEY;
@@ -115,32 +111,6 @@ public class AuthenticatedConsumerStatsTest extends
ConsumerStatsTest{
@Test
public void testConsumerStatsOutput() throws Exception {
- Set<String> allowedFields = Sets.newHashSet(
- "msgRateOut",
- "msgThroughputOut",
- "bytesOutCounter",
- "msgOutCounter",
- "messageAckRate",
- "msgRateRedeliver",
- "chunkedMessageRate",
- "consumerName",
- "availablePermits",
- "unackedMessages",
- "avgMessagesPerEntry",
- "blockedConsumerOnUnackedMsgs",
- "readPositionWhenJoining",
- "lastAckedTime",
- "lastAckedTimestamp",
- "lastConsumedTime",
- "lastConsumedTimestamp",
- "lastConsumedFlowTimestamp",
- "keyHashRanges",
- "metadata",
- "address",
- "connectedSince",
- "clientVersion",
- "appId");
-
final String topicName =
"persistent://public/default/testConsumerStatsOutput";
final String subName = "my-subscription";
@@ -154,13 +124,6 @@ public class AuthenticatedConsumerStatsTest extends
ConsumerStatsTest{
ObjectMapper mapper = ObjectMapperFactory.create();
ConsumerStats consumerStats = stats.getSubscriptions()
.get(subName).getConsumers().get(0);
- Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0);
- JsonNode node =
mapper.readTree(mapper.writer().writeValueAsString(consumerStats));
- Iterator<String> itr = node.fieldNames();
- while (itr.hasNext()) {
- String field = itr.next();
- Assert.assertTrue(allowedFields.contains(field), field + " should
not be exposed");
- }
// assert that role is exposed
Assert.assertEquals(consumerStats.getAppId(), "admin");
consumer.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 5b2998216e8..59a911500e5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -18,9 +18,12 @@
*/
package org.apache.pulsar.broker.stats;
+import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.InstanceOfAssertFactories.INTEGER;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertNotEquals;
import static org.testng.AssertJUnit.assertEquals;
@@ -31,22 +34,29 @@ import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.PrometheusMetricsTestUtil;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.PendingAcksMap;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.StickyKeyDispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -67,13 +77,19 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.DrainingHash;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.assertj.core.groups.Tuple;
+import org.awaitility.Awaitility;
import org.testng.Assert;
+import org.testng.SkipException;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@@ -218,9 +234,24 @@ public class ConsumerStatsTest extends
ProducerConsumerBase {
Assert.assertEquals(updatedStats.getBytesOutCounter(), 1280);
}
- @Test
- public void testConsumerStatsOutput() throws Exception {
- Set<String> allowedFields = Sets.newHashSet(
+ @DataProvider(name = "classicAndSubscriptionType")
+ public Object[][] classicAndSubscriptionType() {
+ return new Object[][]{
+ {false, SubscriptionType.Shared},
+ {true, SubscriptionType.Key_Shared},
+ {false, SubscriptionType.Key_Shared}
+ };
+ }
+
+ @Test(dataProvider = "classicAndSubscriptionType")
+ public void testConsumerStatsOutput(boolean classicDispatchers,
SubscriptionType subscriptionType)
+ throws Exception {
+ if (this instanceof AuthenticatedConsumerStatsTest) {
+ throw new SkipException("Skip test for
AuthenticatedConsumerStatsTest");
+ }
+
conf.setSubscriptionSharedUseClassicPersistentImplementation(classicDispatchers);
+
conf.setSubscriptionKeySharedUseClassicPersistentImplementation(classicDispatchers);
+ Set<String> expectedFields = Sets.newHashSet(
"msgRateOut",
"msgThroughputOut",
"bytesOutCounter",
@@ -233,21 +264,56 @@ public class ConsumerStatsTest extends
ProducerConsumerBase {
"unackedMessages",
"avgMessagesPerEntry",
"blockedConsumerOnUnackedMsgs",
- "readPositionWhenJoining",
"lastAckedTime",
"lastAckedTimestamp",
"lastConsumedTime",
"lastConsumedTimestamp",
"lastConsumedFlowTimestamp",
- "keyHashRanges",
"metadata",
"address",
"connectedSince",
- "clientVersion");
+ "clientVersion",
+ "drainingHashesCount",
+ "drainingHashesClearedTotal",
+ "drainingHashesUnackedMessages"
+ );
+ if (subscriptionType == SubscriptionType.Key_Shared) {
+ if (classicDispatchers) {
+ expectedFields.addAll(List.of(
+ "readPositionWhenJoining",
+ "keyHashRanges"
+ ));
+ } else {
+ expectedFields.addAll(List.of(
+ "drainingHashes",
+ "keyHashRangeArrays"
+ ));
+ }
+ }
+ final String topicName =
newUniqueName("persistent://my-property/my-ns/testConsumerStatsOutput");
+ final String subName = "my-subscription";
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionType(subscriptionType)
+ .subscriptionName(subName)
+ .subscribe();
+
+ String topicStatsUri =
+ String.format("%s/admin/v2/%s/stats",
pulsar.getWebServiceAddress(), topicName.replace("://", "/"));
+ String topicStatsJson =
BrokerTestUtil.getJsonResourceAsString(topicStatsUri);
+ ObjectMapper mapper = ObjectMapperFactory.create();
+ JsonNode node =
mapper.readTree(topicStatsJson).get("subscriptions").get(subName).get("consumers").get(0);
+
assertThat(node.fieldNames()).toIterable().containsExactlyInAnyOrderElementsOf(expectedFields);
+ }
- final String topicName =
"persistent://prop/use/ns-abc/testConsumerStatsOutput";
+ @Test
+ public void testLastConsumerFlowTimestamp() throws PulsarClientException,
PulsarAdminException {
+ final String topicName =
newUniqueName("persistent://my-property/my-ns/testLastConsumerFlowTimestamp");
final String subName = "my-subscription";
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
@@ -255,18 +321,9 @@ public class ConsumerStatsTest extends
ProducerConsumerBase {
.subscribe();
TopicStats stats = admin.topics().getStats(topicName);
- ObjectMapper mapper = ObjectMapperFactory.create();
ConsumerStats consumerStats = stats.getSubscriptions()
.get(subName).getConsumers().get(0);
Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0);
- JsonNode node =
mapper.readTree(mapper.writer().writeValueAsString(consumerStats));
- Iterator<String> itr = node.fieldNames();
- while (itr.hasNext()) {
- String field = itr.next();
- Assert.assertTrue(allowedFields.contains(field), field + " should
not be exposed");
- }
-
- consumer.close();
}
@@ -481,4 +538,189 @@ public class ConsumerStatsTest extends
ProducerConsumerBase {
assertEquals(0, consumers.get(0).getUnackedMessages());
}
+ @Test
+ public void testKeySharedDrainingHashesConsumerStats() throws Exception {
+ String topic =
newUniqueName("testKeySharedDrainingHashesConsumerStats");
+ String subscriptionName = "sub";
+ int numberOfKeys = 10;
+
+ // Create a producer for the topic
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ // Create the first consumer (c1) for the topic
+ @Cleanup
+ Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .consumerName("c1")
+ .receiverQueueSize(100)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ // Get the dispatcher and selector for the topic
+ StickyKeyDispatcher dispatcher = getDispatcher(topic,
subscriptionName);
+ StickyKeyConsumerSelector selector = dispatcher.getSelector();
+
+ // Send 20 messages with keys cycling from 0 to numberOfKeys-1
+ for (int i = 0; i < 20; i++) {
+ String key = String.valueOf(i % numberOfKeys);
+ int stickyKeyHash =
selector.makeStickyKeyHash(key.getBytes(StandardCharsets.UTF_8));
+ log.info("Sending message with value {} key {} hash {}", key, i,
stickyKeyHash);
+ producer.newMessage()
+ .key(key)
+ .value(i)
+ .send();
+ }
+
+ // Wait until all the already published messages have been pre-fetched
by c1
+ PendingAcksMap c1PendingAcks =
dispatcher.getConsumers().get(0).getPendingAcks();
+ Awaitility.await().ignoreExceptions().until(() -> c1PendingAcks.size()
== 20);
+
+ // Add a new consumer (c2) for the topic
+ @Cleanup
+ Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .consumerName("c2")
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ // Get the subscription stats and consumer stats
+ SubscriptionStats subscriptionStats =
admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+ ConsumerStats c1Stats = subscriptionStats.getConsumers().get(0);
+ ConsumerStats c2Stats = subscriptionStats.getConsumers().get(1);
+
+ Set<Integer> c2HashesByStats = new HashSet<>();
+ Set<Integer> c2HashesByDispatcher = new HashSet<>();
+ Map<Integer, Integer> c1DrainingHashesExpected = new HashMap<>();
+
+ int expectedDrainingHashesUnackMessages = 0;
+ // Determine which hashes are assigned to c2 and which are draining
from c1
+ // run for the same keys as the sent messages
+ for (int i = 0; i < 20; i++) {
+ // use the same key as in the sent messages
+ String key = String.valueOf(i % numberOfKeys);
+ int hash =
selector.makeStickyKeyHash(key.getBytes(StandardCharsets.UTF_8));
+ // Validate that the hash is assigned to c2 in stats
+ if ("c2".equals(findConsumerNameForHash(subscriptionStats, hash)))
{
+ c2HashesByStats.add(hash);
+ }
+ // use the selector to determine the expected draining hashes for
c1
+ org.apache.pulsar.broker.service.Consumer selected =
selector.select(hash);
+ if ("c2".equals(selected.consumerName())) {
+ c2HashesByDispatcher.add(hash);
+ c1DrainingHashesExpected.compute(hash, (k, v) -> v == null ? 1
: v + 1);
+ expectedDrainingHashesUnackMessages++;
+ }
+ }
+
+ // Validate that the hashes assigned to c2 match between stats and
dispatcher
+
assertThat(c2HashesByStats).containsExactlyInAnyOrderElementsOf(c2HashesByDispatcher);
+
+ // Validate the draining hashes for c1
+
assertThat(c1Stats.getDrainingHashes()).extracting(DrainingHash::getHash)
+ .containsExactlyInAnyOrderElementsOf(c2HashesByStats);
+
assertThat(c1Stats.getDrainingHashes()).extracting(DrainingHash::getHash,
DrainingHash::getUnackMsgs)
+
.containsExactlyInAnyOrderElementsOf(c1DrainingHashesExpected.entrySet().stream()
+ .map(e -> Tuple.tuple(e.getKey(),
e.getValue())).toList());
+
+ // Validate that c2 has no draining hashes
+ assertThat(c2Stats.getDrainingHashes()).isEmpty();
+
+ // Validate counters
+
assertThat(c1Stats.getDrainingHashesCount()).isEqualTo(c2HashesByStats.size());
+ assertThat(c1Stats.getDrainingHashesClearedTotal()).isEqualTo(0);
+
assertThat(c1Stats.getDrainingHashesUnackedMessages()).isEqualTo(expectedDrainingHashesUnackMessages);
+ assertThat(c2Stats.getDrainingHashesCount()).isEqualTo(0);
+ assertThat(c2Stats.getDrainingHashesClearedTotal()).isEqualTo(0);
+ assertThat(c2Stats.getDrainingHashesUnackedMessages()).isEqualTo(0);
+
+ // Send another 20 messages
+ for (int i = 0; i < 20; i++) {
+ producer.newMessage()
+ .key(String.valueOf(i % numberOfKeys))
+ .value(i)
+ .send();
+ }
+
+ // Validate blocked attempts for c1
+ Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+ SubscriptionStats stats =
admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+
assertThat(stats.getConsumers().get(0).getDrainingHashes()).isNotEmpty().allSatisfy(dh
-> {
+ assertThat(dh).extracting(DrainingHash::getBlockedAttempts)
+ .asInstanceOf(INTEGER)
+ .isGreaterThan(0);
+ });
+ });
+
+ // Acknowledge messages that were sent before c2 joined, to clear all
draining hashes
+ for (int i = 0; i < 20; i++) {
+ Message<Integer> message = c1.receive(1, TimeUnit.SECONDS);
+ log.info("Acking message with value {} key {}",
message.getValue(), message.getKey());
+ c1.acknowledge(message);
+
+ if (i == 18) {
+ // Validate that there is one draining hash left
+
Awaitility.await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3))
+ .untilAsserted(() -> {
+ SubscriptionStats stats =
+
admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+
assertThat(stats.getConsumers().get(0)).satisfies(consumerStats -> {
+ assertThat(consumerStats)
+ .describedAs("Consumer stats should
have one draining hash %s", consumerStats)
+
.extracting(ConsumerStats::getDrainingHashes)
+ .asList().hasSize(1);
+ });
+ });
+ }
+
+ if (i == 19) {
+ // Validate that there are no draining hashes left
+
Awaitility.await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3))
+ .untilAsserted(() -> {
+ SubscriptionStats stats =
+
admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+
assertThat(stats.getConsumers().get(0)).satisfies(consumerStats -> {
+
assertThat(consumerStats).extracting(ConsumerStats::getDrainingHashes)
+ .asList().isEmpty();
+ });
+ });
+ }
+ }
+
+ // Get the subscription stats and consumer stats
+ subscriptionStats =
admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+ c1Stats = subscriptionStats.getConsumers().get(0);
+ c2Stats = subscriptionStats.getConsumers().get(1);
+
+ // Validate counters
+ assertThat(c1Stats.getDrainingHashesCount()).isEqualTo(0);
+
assertThat(c1Stats.getDrainingHashesClearedTotal()).isEqualTo(c2HashesByStats.size());
+ assertThat(c1Stats.getDrainingHashesUnackedMessages()).isEqualTo(0);
+ assertThat(c2Stats.getDrainingHashesCount()).isEqualTo(0);
+ assertThat(c2Stats.getDrainingHashesClearedTotal()).isEqualTo(0);
+ assertThat(c2Stats.getDrainingHashesUnackedMessages()).isEqualTo(0);
+
+ }
+
+ private String findConsumerNameForHash(SubscriptionStats
subscriptionStats, int hash) {
+ return findConsumerForHash(subscriptionStats,
hash).map(ConsumerStats::getConsumerName).orElse(null);
+ }
+
+ private Optional<? extends ConsumerStats>
findConsumerForHash(SubscriptionStats subscriptionStats, int hash) {
+ return subscriptionStats.getConsumers().stream()
+ .filter(consumerStats ->
consumerStats.getKeyHashRangeArrays().stream()
+ .anyMatch(hashRanges -> hashRanges[0] <= hash &&
hashRanges[1] >= hash))
+ .findFirst();
+ }
+
+ @SneakyThrows
+ private StickyKeyDispatcher getDispatcher(String topic, String
subscription) {
+ return (StickyKeyDispatcher)
pulsar.getBrokerService().getTopicIfExists(topic).get()
+ .get().getSubscription(subscription).getDispatcher();
+ }
}
diff --git a/pulsar-broker/src/test/resources/log4j2.xml
b/pulsar-broker/src/test/resources/log4j2.xml
index 09a89702ee2..a0732096f28 100644
--- a/pulsar-broker/src/test/resources/log4j2.xml
+++ b/pulsar-broker/src/test/resources/log4j2.xml
@@ -36,5 +36,16 @@
<Root level="INFO">
<AppenderRef ref="CONSOLE"/>
</Root>
+ <!-- Uncomment the following logger for debugging Key_Shared / PIP-379
+ <Logger
name="org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers"
level="DEBUG" additivity="false">
+ <AppenderRef ref="CONSOLE"/>
+ </Logger>
+ <Logger name="org.apache.pulsar.broker.service.DrainingHashesTracker"
level="DEBUG" additivity="false">
+ <AppenderRef ref="CONSOLE"/>
+ </Logger>
+ <Logger
name="org.apache.pulsar.broker.service.persistent.RescheduleReadHandler"
level="DEBUG" additivity="false">
+ <AppenderRef ref="CONSOLE"/>
+ </Logger>
+ -->
</Loggers>
</Configuration>
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index d2d3600df96..16dce5903f4 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -73,8 +73,41 @@ public interface ConsumerStats {
boolean isBlockedConsumerOnUnackedMsgs();
/** The read position of the cursor when the consumer joining. */
+ @Deprecated
String getReadPositionWhenJoining();
+ /**
+ * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+ * Retrieves the current number of hashes in the draining state for this
consumer.
+ *
+ * @return the current number of hashes in the draining state for this
consumer
+ */
+ int getDrainingHashesCount();
+
+ /**
+ * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+ * Retrieves the total number of hashes cleared from the draining state
since the consumer connected.
+ *
+ * @return the total number of hashes cleared from the draining state
since the consumer connected
+ */
+ long getDrainingHashesClearedTotal();
+
+ /**
+ * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+ * Retrieves the total number of unacked messages for all draining hashes
for this consumer.
+ *
+ * @return the total number of unacked messages for all draining hashes
for this consumer
+ */
+ int getDrainingHashesUnackedMessages();
+
+ /**
+ * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+ * Retrieves the draining hashes for this consumer.
+ *
+ * @return a list of draining hashes for this consumer
+ */
+ List<DrainingHash> getDrainingHashes();
+
/** Address of this consumer. */
String getAddress();
@@ -88,9 +121,20 @@ public interface ConsumerStats {
long getLastConsumedTimestamp();
long getLastConsumedFlowTimestamp();
- /** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/
+ /**
+ * Hash ranges assigned to this consumer if in Key_Shared subscription
mode.
+ * This format and field is used when
`subscriptionKeySharedUseClassicPersistentImplementation` is set to `false`
+ * (default).
+ */
+ List<int[]> getKeyHashRangeArrays();
+
+ /**
+ * Hash ranges assigned to this consumer if in Key_Shared subscription
mode.
+ * This format and field is used when
`subscriptionKeySharedUseClassicPersistentImplementation` is set to `true`.
+ */
+ @Deprecated
List<String> getKeyHashRanges();
/** Metadata (key/value strings) associated with this consumer. */
Map<String, String> getMetadata();
-}
+}
\ No newline at end of file
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java
new file mode 100644
index 00000000000..685b0b74e64
--- /dev/null
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Contains information about a draining hash in a Key_Shared subscription.
+ * @see ConsumerStats
+ */
+public interface DrainingHash {
+ /**
+ * Get the sticky key hash value of the draining hash.
+ * @return the sticky hash value
+ */
+ int getHash();
+ /**
+ * Get number of unacknowledged messages for the draining hash.
+ * @return number of unacknowledged messages
+ */
+ int getUnackMsgs();
+ /**
+ * Get the number of times the hash has blocked an attempted delivery of a
message.
+ * @return number of times the hash has blocked an attempted delivery of a
message
+ */
+ int getBlockedAttempts();
+}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index ce3a080a855..95e7c65266b 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -121,6 +121,30 @@ public interface SubscriptionStats {
/** This is for Key_Shared subscription to get the recentJoinedConsumers
in the Key_Shared subscription. */
Map<String, String> getConsumersAfterMarkDeletePosition();
+ /**
+ * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+ * Retrieves the current number of hashes in the draining state.
+ *
+ * @return the current number of hashes in the draining state
+ */
+ int getDrainingHashesCount();
+
+ /**
+ * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+ * Retrieves the total number of hashes cleared from the draining state
for the connected consumers.
+ *
+ * @return the total number of hashes cleared from the draining state for
the connected consumers
+ */
+ long getDrainingHashesClearedTotal();
+
+ /**
+ * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+ * Retrieves the total number of unacked messages for all draining hashes.
+ *
+ * @return the total number of unacked messages for all draining hashes
+ */
+ int getDrainingHashesUnackedMessages();
+
/** SubscriptionProperties (key/value strings) associated with this
subscribe. */
Map<String, String> getSubscriptionProperties();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index de36b330b7f..8811247cb2d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Objects;
import lombok.Data;
import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.DrainingHash;
import org.apache.pulsar.common.util.DateFormatter;
/**
@@ -80,6 +81,30 @@ public class ConsumerStatsImpl implements ConsumerStats {
/** The read position of the cursor when the consumer joining. */
public String readPositionWhenJoining;
+ /**
+ * For Key_Shared AUTO_SPLIT ordered subscriptions: The current number of
hashes in the draining state.
+ */
+ public int drainingHashesCount;
+
+ /**
+ * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of
hashes cleared from the draining state for
+ * the consumer.
+ */
+ public long drainingHashesClearedTotal;
+
+ /**
+ * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of
unacked messages for all draining hashes.
+ */
+ public int drainingHashesUnackedMessages;
+
+ /**
+ * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+ * Retrieves the draining hashes for this consumer.
+ *
+ * @return a list of draining hashes for this consumer
+ */
+ public List<DrainingHash> drainingHashes;
+
/** Address of this consumer. */
private String address;
/** Timestamp of connection. */
@@ -96,7 +121,17 @@ public class ConsumerStatsImpl implements ConsumerStats {
public long lastConsumedFlowTimestamp;
- /** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/
+ /**
+ * Hash ranges assigned to this consumer if in Key_Shared subscription
mode.
+ * This format and field is used when
`subscriptionKeySharedUseClassicPersistentImplementation` is set to `false`
+ * (default).
+ */
+ public List<int[]> keyHashRangeArrays;
+
+ /**
+ * Hash ranges assigned to this consumer if in Key_Shared subscription
mode.
+ * This format and field is used when
`subscriptionKeySharedUseClassicPersistentImplementation` is set to `true`.
+ */
public List<String> keyHashRanges;
/** Metadata (key/value strings) associated with this consumer. */
@@ -114,6 +149,12 @@ public class ConsumerStatsImpl implements ConsumerStats {
this.unackedMessages += stats.unackedMessages;
this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs;
this.readPositionWhenJoining = stats.readPositionWhenJoining;
+ this.drainingHashesCount = stats.drainingHashesCount;
+ this.drainingHashesClearedTotal += stats.drainingHashesClearedTotal;
+ this.drainingHashesUnackedMessages =
stats.drainingHashesUnackedMessages;
+ this.drainingHashes = stats.drainingHashes;
+ this.keyHashRanges = stats.keyHashRanges;
+ this.keyHashRangeArrays = stats.keyHashRangeArrays;
return this;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java
new file mode 100644
index 00000000000..134bdac597b
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import lombok.Data;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.DrainingHash;
+
+/**
+ * Contains information about a draining hash in a Key_Shared subscription.
+ * @see ConsumerStats
+ */
+@Data
+public class DrainingHashImpl implements DrainingHash {
+ /**
+ * Get the sticky key hash value of the draining hash.
+ * @return the sticky hash value
+ */
+ public int hash;
+ /**
+ * Get number of unacknowledged messages for the draining hash.
+ * @return number of unacknowledged messages
+ */
+ public int unackMsgs;
+ /**
+ * Get the number of times the hash has blocked an attempted delivery of a
message.
+ * @return number of times the hash has blocked an attempted delivery of a
message
+ */
+ public int blockedAttempts;
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 12734a5586c..02df9b78700 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -129,6 +129,22 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
/** This is for Key_Shared subscription to get the recentJoinedConsumers
in the Key_Shared subscription. */
public Map<String, String> consumersAfterMarkDeletePosition;
+ /**
+ * For Key_Shared AUTO_SPLIT ordered subscriptions: The current number of
hashes in the draining state.
+ */
+ public int drainingHashesCount;
+
+ /**
+ * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of
hashes cleared from the draining state
+ * for the connected consumers.
+ */
+ public long drainingHashesClearedTotal;
+
+ /**
+ * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of
unacked messages for all draining hashes.
+ */
+ public int drainingHashesUnackedMessages;
+
/** The number of non-contiguous deleted messages ranges. */
public int nonContiguousDeletedMessagesRanges;
@@ -180,6 +196,9 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
lastMarkDeleteAdvancedTimestamp = 0L;
consumers.clear();
consumersAfterMarkDeletePosition.clear();
+ drainingHashesCount = 0;
+ drainingHashesClearedTotal = 0L;
+ drainingHashesUnackedMessages = 0;
nonContiguousDeletedMessagesRanges = 0;
nonContiguousDeletedMessagesRangesSerializedSize = 0;
earliestMsgPublishTimeInBacklog = 0L;
@@ -226,6 +245,9 @@ public class SubscriptionStatsImpl implements
SubscriptionStats {
}
this.allowOutOfOrderDelivery |= stats.allowOutOfOrderDelivery;
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
+ this.drainingHashesCount += stats.drainingHashesCount;
+ this.drainingHashesClearedTotal += stats.drainingHashesClearedTotal;
+ this.drainingHashesUnackedMessages +=
stats.drainingHashesUnackedMessages;
this.nonContiguousDeletedMessagesRanges +=
stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize +=
stats.nonContiguousDeletedMessagesRangesSerializedSize;
if (this.earliestMsgPublishTimeInBacklog != 0 &&
stats.earliestMsgPublishTimeInBacklog != 0) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
index 7b235cfa341..b737d68d5ea 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.DrainingHash;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.FunctionInstanceStats;
@@ -96,6 +97,7 @@ import
org.apache.pulsar.common.policies.data.impl.BundlesDataImpl;
import org.apache.pulsar.common.policies.data.impl.DelayedDeliveryPoliciesImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl;
import
org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
import
org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
import
org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
@@ -243,6 +245,7 @@ public class ObjectMapperFactory {
resolver.addMapping(DispatchRate.class, DispatchRateImpl.class);
resolver.addMapping(TopicStats.class, TopicStatsImpl.class);
resolver.addMapping(ConsumerStats.class, ConsumerStatsImpl.class);
+ resolver.addMapping(DrainingHash.class, DrainingHashImpl.class);
resolver.addMapping(NonPersistentPublisherStats.class,
NonPersistentPublisherStatsImpl.class);
resolver.addMapping(NonPersistentReplicatorStats.class,
NonPersistentReplicatorStatsImpl.class);
resolver.addMapping(NonPersistentSubscriptionStats.class,
NonPersistentSubscriptionStatsImpl.class);