Copilot commented on code in PR #25120:
URL: https://github.com/apache/pulsar/pull/25120#discussion_r2731511365
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java:
##########
@@ -294,4 +301,52 @@ public void testChunkMessagesNotBeLostNoConsumer() {
assertTrue(assignor.getUuidToConsumer().isEmpty());
}
+ /**
+ * Simulate the occurrence of chunk messages. When a message with chunk ID
0 is abnormally lost, subsequent chunk
+ * messages for that batch should be skipped instead of blocking the
entire subscription.
+ */
+ @Test
+ public void testSkipOrphanChunk() {
+ cleanupQueue.clear();
+ Subscription subscription = mock(Subscription.class);
+ when(subscription.getTopicName()).thenReturn("test-topic");
+ when(subscription.getName()).thenReturn("test-sub");
+
+ assignor = new SharedConsumerAssignor(roundRobinConsumerSelector,
replayQueue::add, subscription);
+
+ final Consumer consumer = new Consumer("C1", 10);
+ roundRobinConsumerSelector.addConsumers(consumer);
+
+ List<EntryAndMetadata> entries = new ArrayList<>();
+ AtomicLong entryId = new AtomicLong(0);
+ MockProducer producer = new MockProducer("P", entryId, entries);
+
+ // 0:0@P-0
+ producer.sendMessage();
+
+ // Simulate the sending of chunk messages with missing chunkId '0'
+ producer.sendChunk(1, 3);
+ producer.sendChunk(2, 3);
+
+ // 0:3@P-2
+ producer.sendMessage();
+
+ // Add to cleanupQueue but skip the orphan chunk as it will be
released by assignor
+ cleanupQueue.add(entries.get(0));
+ cleanupQueue.add(entries.get(3));
+
Review Comment:
This test clears `cleanupQueue` before `releaseEntries()` runs, so the
`EntryAndMetadata` objects created in `prepareData()` for this test are never
released, leaking their underlying `ByteBuf`s. Instead of clearing the queue
unconditionally, the pre-populated entries should be explicitly released (or
`prepareData` should be bypassed for this scenario) so that only the test-local
entries are left for `releaseEntries()` and there are no resource leaks during
the test run.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -88,15 +91,46 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata>
if (metadata == null || !metadata.hasUuid() ||
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new
ArrayList<>()).add(entryAndMetadata);
+ availablePermits--;
} else {
- final Consumer consumerForUuid = getConsumerForUuid(metadata,
consumer, availablePermits);
+ final String uuid = metadata.getUuid();
+ Consumer consumerForUuid = uuidToConsumer.get(uuid);
if (consumerForUuid == null) {
+ if (metadata.getChunkId() != 0) {
+ if (subscription != null) {
+ log.warn("[{}][{}] Skip the message because it is
not the first chunk."
+ + " Position: {}, UUID: {},
ChunkId: {}, NumChunksFromMsg: {}",
+ subscription.getTopicName(),
subscription.getName(), entryAndMetadata.getPosition(),
+ metadata.getUuid(), metadata.getChunkId(),
metadata.getNumChunksFromMsg());
+ // Directly ack the message.
+ if (!(subscription instanceof
PulsarCompactorSubscription)) {
+
subscription.acknowledgeMessage(Collections.singletonList(
+ entryAndMetadata.getPosition()),
AckType.Individual, Collections.emptyMap());
+ entryAndMetadata.release();
+ }
+ }
Review Comment:
In this orphan-chunk branch we only call `entryAndMetadata.release()` when
the subscription is *not* a `PulsarCompactorSubscription`. For compactor
subscriptions we log and `continue` without passing the entry to
`unassignedMessageProcessor` and without releasing it, so the underlying
`Entry`/`ByteBuf` is leaked. To avoid leaking resources, the entry should be
released regardless of subscription type (while still skipping the individual
ack for `PulsarCompactorSubscription`), for example by moving the `release()`
call outside the subscription-type check or adding a separate release in the
compactor case.
```suggestion
}
}
entryAndMetadata.release();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]