This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 372f463a480 [improve] Eliminate unnecessary duplicate schema lookups
for partitioned topics in client and geo-replication (#25011)
372f463a480 is described below
commit 372f463a4802030dbff7804f321b5c7b140f6bab
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Nov 26 07:38:57 2025 +0200
[improve] Eliminate unnecessary duplicate schema lookups for partitioned
topics in client and geo-replication (#25011)
---
.../pulsar/broker/service/persistent/PersistentReplicator.java | 5 ++++-
.../main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 5 +++--
2 files changed, 7 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index bda5acce236..e0a31476fc9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.api.proto.MarkerType;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.stats.Rate;
@@ -80,6 +81,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
+ protected final String localSchemaTopicName;
protected Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
private final Object dispatchRateLimiterLock = new Object();
@@ -123,6 +125,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
super(localCluster, localTopic, remoteCluster, remoteTopic,
localTopic.getReplicatorPrefix(),
brokerService, replicationClient);
this.topic = localTopic;
+ this.localSchemaTopicName =
TopicName.getPartitionedTopicName(localTopicName).toString();
this.cursor = Objects.requireNonNull(cursor);
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
Codec.decode(cursor.getName()), cursor, null);
@@ -378,7 +381,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length ==
0) {
return CompletableFuture.completedFuture(null);
}
- return client.getSchemaProviderLoadingCache().get(localTopicName)
+ return client.getSchemaProviderLoadingCache().get(localSchemaTopicName)
.getSchemaByVersion(msg.getSchemaVersion());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 859480f6fc8..1d8ecb47278 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1363,10 +1363,11 @@ public class PulsarClientImpl implements PulsarClient {
String
topicName) {
if (schema != null && schema.supportSchemaVersioning()) {
final SchemaInfoProvider schemaInfoProvider;
+ String schemaTopicName =
TopicName.getPartitionedTopicName(topicName).toString();
try {
- schemaInfoProvider =
pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName);
+ schemaInfoProvider =
pulsarClientImpl.getSchemaProviderLoadingCache().get(schemaTopicName);
} catch (ExecutionException e) {
- log.error("Failed to load schema info provider for topic {}",
topicName, e);
+ log.error("Failed to load schema info provider for topic {}",
schemaTopicName, e);
return FutureUtil.failedFuture(e.getCause());
}
schema = schema.clone();