This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 337ce5a6ab81d497e4b4a986604ace30bc41c115 Author: Lari Hotari <[email protected]> AuthorDate: Wed Nov 26 07:38:57 2025 +0200 [improve] Eliminate unnecessary duplicate schema lookups for partitioned topics in client and geo-replication (#25011) (cherry picked from commit 163f35fd77af5d2ac6191f837a9d726c172fb0bd) --- .../pulsar/broker/service/persistent/PersistentReplicator.java | 8 ++++++-- .../main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 5 +++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 4661ece815a..b23f17dfc16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -30,6 +30,7 @@ import io.netty.util.Recycler.Handle; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -65,6 +66,7 @@ import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.SendCallback; import org.apache.pulsar.common.api.proto.MarkerType; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.stats.Rate; @@ -78,6 +80,7 @@ public abstract class PersistentReplicator extends AbstractReplicator protected final PersistentTopic topic; protected final ManagedCursor cursor; + protected final String localSchemaTopicName; protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty(); private final Object dispatchRateLimiterLock = new Object(); @@ -120,7 +123,8 @@ public abstract class PersistentReplicator extends AbstractReplicator super(localCluster, localTopic, remoteCluster, remoteTopic, localTopic.getReplicatorPrefix(), brokerService, replicationClient); this.topic = localTopic; - this.cursor = cursor; + this.localSchemaTopicName = TopicName.getPartitionedTopicName(localTopicName).toString(); + this.cursor = Objects.requireNonNull(cursor); this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopicName, Codec.decode(cursor.getName()), cursor, null); @@ -373,7 +377,7 @@ public abstract class PersistentReplicator extends AbstractReplicator if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) { return CompletableFuture.completedFuture(null); } - return client.getSchemaProviderLoadingCache().get(localTopicName) + return client.getSchemaProviderLoadingCache().get(localSchemaTopicName) .getSchemaByVersion(msg.getSchemaVersion()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index c25d2397229..e2c875f6faa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -1297,10 +1297,11 @@ public class PulsarClientImpl implements PulsarClient { String topicName) { if (schema != null && schema.supportSchemaVersioning()) { final SchemaInfoProvider schemaInfoProvider; + String schemaTopicName = TopicName.getPartitionedTopicName(topicName).toString(); try { - schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName); + schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(schemaTopicName); } catch (ExecutionException e) { - log.error("Failed to load schema info provider for topic {}", topicName, e); + log.error("Failed to load schema info provider for topic {}", schemaTopicName, e); return FutureUtil.failedFuture(e.getCause()); } schema = schema.clone();
