Denovo1998 commented on code in PR #25555:
URL: https://github.com/apache/pulsar/pull/25555#discussion_r3234858904
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##########
@@ -292,26 +298,9 @@ private boolean isSupportsReplDedupByLidAndEid() {
return cnx.isClientSupportsReplDedupByLidAndEid() &&
topic.isPersistent();
}
- private void publishMessageToTopic(ByteBuf headersAndPayload, long
sequenceId, int batchSize, boolean isChunked,
- boolean isMarker, Position position) {
- MessagePublishContext messagePublishContext =
- MessagePublishContext.get(this, sequenceId,
headersAndPayload.readableBytes(),
- batchSize, isChunked, System.nanoTime(), isMarker,
position, isSupportsReplDedupByLidAndEid());
- if (brokerInterceptor != null) {
- brokerInterceptor
- .onMessagePublish(this, headersAndPayload,
messagePublishContext);
- }
- topic.publishMessage(headersAndPayload, messagePublishContext);
- }
-
- private void publishMessageToTopic(ByteBuf headersAndPayload, long
lowestSequenceId, long highestSequenceId,
- int batchSize, boolean isChunked,
boolean isMarker, Position position) {
- MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, lowestSequenceId,
- highestSequenceId, headersAndPayload.readableBytes(),
batchSize,
- isChunked, System.nanoTime(), isMarker, position,
isSupportsReplDedupByLidAndEid());
+ private void publishMessageToTopic(ByteBuf headersAndPayload,
MessagePublishContext messagePublishContext) {
if (brokerInterceptor != null) {
- brokerInterceptor
- .onMessagePublish(this, headersAndPayload,
messagePublishContext);
+ brokerInterceptor.onMessagePublish(this, headersAndPayload,
messagePublishContext);
Review Comment:
The PR body says the cached metadata is invalidated after
BrokerInterceptor.onMessagePublish(...), but I don't see that in the final diff.
This changes behavior when metadata has already been cached before the
interceptor runs. For example, on an encryption-required topic,
checkAndStartPublish(...) calls publishContext.getMessageMetadata(...). If an
interceptor then mutates the message metadata in headersAndPayload,
PersistentTopic.isExceedMaximumDeliveryDelay(...) and MessageDeduplication will
reuse the pre-interceptor cached metadata. In current master those later checks
reparse headersAndPayload after the interceptor, so they observe the
post-interceptor metadata.
Could we add a
MessagePublishContext#clearMessageMetadata()/invalidateMessageMetadata() and
call it immediately after BrokerInterceptor.onMessagePublish(...) in both the
normal publish path and publishTxnMessage?
---
If multiple interceptors are executed in series and the first interceptor
modifies metadata, and the second interceptor reads metadata through
publishContext.getMessageMetadata(...), then clearing the cache only after the
entire BrokerInterceptors returns cannot solve the visibility within the
interceptor chain?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]