This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 991ce21048c5f1167033f07ca33682f681a41b79
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Nov 26 07:38:57 2025 +0200

    [improve] Eliminate unnecessary duplicate schema lookups for partitioned 
topics in client and geo-replication (#25011)
    
    (cherry picked from commit 163f35fd77af5d2ac6191f837a9d726c172fb0bd)
---
 .../pulsar/broker/service/persistent/PersistentReplicator.java       | 5 ++++-
 .../main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java    | 5 +++--
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index bda5acce236..e0a31476fc9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.SendCallback;
 import org.apache.pulsar.common.api.proto.MarkerType;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.stats.Rate;
@@ -80,6 +81,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     protected final PersistentTopic topic;
     protected final ManagedCursor cursor;
+    protected final String localSchemaTopicName;
 
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
     private final Object dispatchRateLimiterLock = new Object();
@@ -123,6 +125,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         super(localCluster, localTopic, remoteCluster, remoteTopic, 
localTopic.getReplicatorPrefix(),
                 brokerService, replicationClient);
         this.topic = localTopic;
+        this.localSchemaTopicName = 
TopicName.getPartitionedTopicName(localTopicName).toString();
         this.cursor = Objects.requireNonNull(cursor);
         this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
                 Codec.decode(cursor.getName()), cursor, null);
@@ -378,7 +381,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 
0) {
             return CompletableFuture.completedFuture(null);
         }
-        return client.getSchemaProviderLoadingCache().get(localTopicName)
+        return client.getSchemaProviderLoadingCache().get(localSchemaTopicName)
                 .getSchemaByVersion(msg.getSchemaVersion());
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index f3edafc8efd..cac6617c9dd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1335,10 +1335,11 @@ public class PulsarClientImpl implements PulsarClient {
                                                                       String 
topicName) {
         if (schema != null && schema.supportSchemaVersioning()) {
             final SchemaInfoProvider schemaInfoProvider;
+            String schemaTopicName = 
TopicName.getPartitionedTopicName(topicName).toString();
             try {
-                schemaInfoProvider = 
pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName);
+                schemaInfoProvider = 
pulsarClientImpl.getSchemaProviderLoadingCache().get(schemaTopicName);
             } catch (ExecutionException e) {
-                log.error("Failed to load schema info provider for topic {}", 
topicName, e);
+                log.error("Failed to load schema info provider for topic {}", 
schemaTopicName, e);
                 return FutureUtil.failedFuture(e.getCause());
             }
             schema = schema.clone();

Reply via email to