Denovo1998 commented on code in PR #25817:
URL: https://github.com/apache/pulsar/pull/25817#discussion_r3303886313
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -640,6 +640,80 @@ public void testBatchMessageWithNullValue() throws
Exception {
assertEquals(messages.get(2).getKey(), "key5");
}
+ /**
+ * Write raw non-batch entries directly to the managed ledger, simulating
+ * messages from C++/Python clients that do not set numMessagesInBatch.
+ * Verifies that null-value tombstones remove keys during compaction.
Review Comment:
Small wording nit: this comment says the raw entries simulate C++/Python
clients because they do not set `numMessagesInBatch`. The Java non-batch send
path also does not set `MessageMetadata.numMessagesInBatch`; Java avoids the
old bug mainly because `ProducerImpl#updateMessageMetadata` sets
`uncompressedSize=0` for null payloads.
Could we reword this as "raw non-batch entries without `uncompressedSize`,
as seen with some non-Java clients" so future readers don't infer that
`numMessagesInBatch` is what protects Java non-batch messages?
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java:
##########
@@ -462,14 +462,22 @@ private CompletableFuture<Void>
addToCompactedLedger(LedgerHandle lh, RawMessage
return bkf;
}
+ /**
+ * Extract the partition key and the payload size for a non-batch message.
+ *
+ * @return a pair of (partitionKey, payloadSize), or null if the message has
no partition key.
+ */
protected Pair<String, Integer> extractKeyAndSize(RawMessage m,
MessageMetadata msgMetadata) {
- ByteBuf headersAndPayload = m.getHeadersAndPayload();
if (msgMetadata.hasPartitionKey()) {
- int size = headersAndPayload.readableBytes();
+ int payloadSize;
if (msgMetadata.hasUncompressedSize()) {
- size = msgMetadata.getUncompressedSize();
+ payloadSize = msgMetadata.getUncompressedSize();
+ } else {
+ ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
+ Commands.skipMessageMetadata(headersAndPayload);
+ payloadSize = headersAndPayload.readableBytes();
Review Comment:
Should we check `msgMetadata.hasNullValue() && msgMetadata.isNullValue()`
before deriving the size from payload bytes?
The new payload-only calculation fixes plain non-batch tombstones, but it
can still keep encrypted tombstones when the producer does not set
`uncompressedSize`. Encryption can turn an empty/null payload into non-empty
ciphertext, so after `skipMessageMetadata(...)`, `readableBytes()` may still be
> 0 and phase one will put the tombstone into `latestForKey` instead of
removing the key.
Java producers usually avoid this because
`ProducerImpl#updateMessageMetadata` sets `uncompressedSize=0` for null
payloads, but this PR is specifically fixing producers whose metadata does not
include that size. The explicit `nullValue` flag is the protocol-level
tombstone signal, so I think this helper should return size 0 for
`nullValue=true` before falling back to `uncompressedSize` or payload bytes.
```java
if (msgMetadata.hasNullValue() && msgMetadata.isNullValue()) {
payloadSize = 0;
} else if (msgMetadata.hasUncompressedSize()) {
payloadSize = msgMetadata.getUncompressedSize();
} else {
ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
Commands.skipMessageMetadata(headersAndPayload);
payloadSize = headersAndPayload.readableBytes();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]