This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 372f463a480 [improve] Eliminate unnecessary duplicate schema lookups 
for partitioned topics in client and geo-replication (#25011)
372f463a480 is described below

commit 372f463a4802030dbff7804f321b5c7b140f6bab
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Nov 26 07:38:57 2025 +0200

    [improve] Eliminate unnecessary duplicate schema lookups for partitioned 
topics in client and geo-replication (#25011)
---
 .../pulsar/broker/service/persistent/PersistentReplicator.java       | 5 ++++-
 .../main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java    | 5 +++--
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index bda5acce236..e0a31476fc9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.SendCallback;
 import org.apache.pulsar.common.api.proto.MarkerType;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.stats.Rate;
@@ -80,6 +81,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     protected final PersistentTopic topic;
     protected final ManagedCursor cursor;
+    protected final String localSchemaTopicName;
 
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
     private final Object dispatchRateLimiterLock = new Object();
@@ -123,6 +125,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         super(localCluster, localTopic, remoteCluster, remoteTopic, 
localTopic.getReplicatorPrefix(),
                 brokerService, replicationClient);
         this.topic = localTopic;
+        this.localSchemaTopicName = 
TopicName.getPartitionedTopicName(localTopicName).toString();
         this.cursor = Objects.requireNonNull(cursor);
         this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
                 Codec.decode(cursor.getName()), cursor, null);
@@ -378,7 +381,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 
0) {
             return CompletableFuture.completedFuture(null);
         }
-        return client.getSchemaProviderLoadingCache().get(localTopicName)
+        return client.getSchemaProviderLoadingCache().get(localSchemaTopicName)
                 .getSchemaByVersion(msg.getSchemaVersion());
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 859480f6fc8..1d8ecb47278 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1363,10 +1363,11 @@ public class PulsarClientImpl implements PulsarClient {
                                                                       String 
topicName) {
         if (schema != null && schema.supportSchemaVersioning()) {
             final SchemaInfoProvider schemaInfoProvider;
+            String schemaTopicName = 
TopicName.getPartitionedTopicName(topicName).toString();
             try {
-                schemaInfoProvider = 
pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName);
+                schemaInfoProvider = 
pulsarClientImpl.getSchemaProviderLoadingCache().get(schemaTopicName);
             } catch (ExecutionException e) {
-                log.error("Failed to load schema info provider for topic {}", 
topicName, e);
+                log.error("Failed to load schema info provider for topic {}", 
schemaTopicName, e);
                 return FutureUtil.failedFuture(e.getCause());
             }
             schema = schema.clone();

Reply via email to