Denovo1998 commented on code in PR #25555:
URL: https://github.com/apache/pulsar/pull/25555#discussion_r3234858904


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##########
@@ -292,26 +298,9 @@ private boolean isSupportsReplDedupByLidAndEid() {
         return cnx.isClientSupportsReplDedupByLidAndEid() && 
topic.isPersistent();
     }
 
-    private void publishMessageToTopic(ByteBuf headersAndPayload, long 
sequenceId, int batchSize, boolean isChunked,
-                                       boolean isMarker, Position position) {
-        MessagePublishContext messagePublishContext =
-                MessagePublishContext.get(this, sequenceId, 
headersAndPayload.readableBytes(),
-                        batchSize, isChunked, System.nanoTime(), isMarker, 
position, isSupportsReplDedupByLidAndEid());
-        if (brokerInterceptor != null) {
-            brokerInterceptor
-                    .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
-        }
-        topic.publishMessage(headersAndPayload, messagePublishContext);
-    }
-
-    private void publishMessageToTopic(ByteBuf headersAndPayload, long 
lowestSequenceId, long highestSequenceId,
-                                       int batchSize, boolean isChunked, 
boolean isMarker, Position position) {
-        MessagePublishContext messagePublishContext = 
MessagePublishContext.get(this, lowestSequenceId,
-                highestSequenceId, headersAndPayload.readableBytes(), 
batchSize,
-                isChunked, System.nanoTime(), isMarker, position, 
isSupportsReplDedupByLidAndEid());
+    private void publishMessageToTopic(ByteBuf headersAndPayload, 
MessagePublishContext messagePublishContext) {
         if (brokerInterceptor != null) {
-            brokerInterceptor
-                    .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
+            brokerInterceptor.onMessagePublish(this, headersAndPayload, 
messagePublishContext);

Review Comment:
   The PR body says the cached metadata is invalidated after 
BrokerInterceptor.onMessagePublish(...), but I don't see that in the final diff.
   
   This changes behavior when metadata has already been cached before the 
interceptor runs. For example, on an encryption-required topic, 
checkAndStartPublish(...) calls publishContext.getMessageMetadata(...). If an 
interceptor then mutates the message metadata in headersAndPayload, 
PersistentTopic.isExceedMaximumDeliveryDelay(...) and MessageDeduplication will 
reuse the pre-interceptor cached metadata. In current master those later checks 
reparse headersAndPayload after the interceptor, so they observe the 
post-interceptor metadata.
   
   Could we add a 
MessagePublishContext#clearMessageMetadata()/invalidateMessageMetadata() and 
call it immediately after BrokerInterceptor.onMessagePublish(...) in both the 
normal publish path and publishTxnMessage?
   
   ---
   
   If multiple interceptors are executed in series and the first interceptor 
modifies metadata, and the second interceptor reads metadata through 
publishContext.getMessageMetadata(...), then clearing the cache only after the 
entire BrokerInterceptors returns cannot solve the visibility within the 
interceptor chain?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to