This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 991ce21048c5f1167033f07ca33682f681a41b79 Author: Lari Hotari <[email protected]> AuthorDate: Wed Nov 26 07:38:57 2025 +0200 [improve] Eliminate unnecessary duplicate schema lookups for partitioned topics in client and geo-replication (#25011) (cherry picked from commit 163f35fd77af5d2ac6191f837a9d726c172fb0bd) --- .../pulsar/broker/service/persistent/PersistentReplicator.java | 5 ++++- .../main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index bda5acce236..e0a31476fc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -67,6 +67,7 @@ import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.SendCallback; import org.apache.pulsar.common.api.proto.MarkerType; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.stats.Rate; @@ -80,6 +81,7 @@ public abstract class PersistentReplicator extends AbstractReplicator protected final PersistentTopic topic; protected final ManagedCursor cursor; + protected final String localSchemaTopicName; protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty(); private final Object dispatchRateLimiterLock = new Object(); @@ -123,6 +125,7 @@ public abstract class PersistentReplicator extends AbstractReplicator super(localCluster, localTopic, remoteCluster, remoteTopic, localTopic.getReplicatorPrefix(), brokerService, replicationClient); this.topic = localTopic; + this.localSchemaTopicName = TopicName.getPartitionedTopicName(localTopicName).toString(); this.cursor = Objects.requireNonNull(cursor); this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic, Codec.decode(cursor.getName()), cursor, null); @@ -378,7 +381,7 @@ public abstract class PersistentReplicator extends AbstractReplicator if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) { return CompletableFuture.completedFuture(null); } - return client.getSchemaProviderLoadingCache().get(localTopicName) + return client.getSchemaProviderLoadingCache().get(localSchemaTopicName) .getSchemaByVersion(msg.getSchemaVersion()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index f3edafc8efd..cac6617c9dd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -1335,10 +1335,11 @@ public class PulsarClientImpl implements PulsarClient { String topicName) { if (schema != null && schema.supportSchemaVersioning()) { final SchemaInfoProvider schemaInfoProvider; + String schemaTopicName = TopicName.getPartitionedTopicName(topicName).toString(); try { - schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName); + schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(schemaTopicName); } catch (ExecutionException e) { - log.error("Failed to load schema info provider for topic {}", topicName, e); + log.error("Failed to load schema info provider for topic {}", schemaTopicName, e); return FutureUtil.failedFuture(e.getCause()); } schema = schema.clone();
