Copilot commented on code in PR #25120:
URL: https://github.com/apache/pulsar/pull/25120#discussion_r2731511365


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java:
##########
@@ -294,4 +301,52 @@ public void testChunkMessagesNotBeLostNoConsumer() {
         assertTrue(assignor.getUuidToConsumer().isEmpty());
     }
 
+    /**
+     * Simulate the occurrence of chunk messages. When a message with chunk ID 
0 is abnormally lost, subsequent chunk
+     * messages for that batch should be skipped instead of blocking the 
entire subscription.
+     */
+    @Test
+    public void testSkipOrphanChunk() {
+        cleanupQueue.clear();
+        Subscription subscription = mock(Subscription.class);
+        when(subscription.getTopicName()).thenReturn("test-topic");
+        when(subscription.getName()).thenReturn("test-sub");
+
+        assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, 
replayQueue::add, subscription);
+
+        final Consumer consumer = new Consumer("C1", 10);
+        roundRobinConsumerSelector.addConsumers(consumer);
+
+        List<EntryAndMetadata> entries = new ArrayList<>();
+        AtomicLong entryId = new AtomicLong(0);
+        MockProducer producer = new MockProducer("P", entryId, entries);
+
+        // 0:0@P-0
+        producer.sendMessage();
+
+        // Simulate the sending of chunk messages with missing chunkId '0'
+        producer.sendChunk(1, 3);
+        producer.sendChunk(2, 3);
+
+        // 0:3@P-2
+        producer.sendMessage();
+
+        // Add to cleanupQueue but skip the orphan chunk as it will be 
released by assignor
+        cleanupQueue.add(entries.get(0));
+        cleanupQueue.add(entries.get(3));
+

Review Comment:
   This test clears `cleanupQueue` before `releaseEntries()` runs, so the 
`EntryAndMetadata` objects created in `prepareData()` for this test are never 
released, leaking their underlying `ByteBuf`s. Instead of clearing the queue 
unconditionally, the pre-populated entries should be explicitly released (or 
`prepareData` should be bypassed for this scenario) so that only the test-local 
entries are left for `releaseEntries()` and there are no resource leaks during 
the test run.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -88,15 +91,46 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata>
 
             if (metadata == null || !metadata.hasUuid() || 
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
                 consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entryAndMetadata);
+                availablePermits--;
             } else {
-                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer, availablePermits);
+                final String uuid = metadata.getUuid();
+                Consumer consumerForUuid = uuidToConsumer.get(uuid);
                 if (consumerForUuid == null) {
+                    if (metadata.getChunkId() != 0) {
+                        if (subscription != null) {
+                            log.warn("[{}][{}] Skip the message because it is 
not the first chunk."
+                                            + " Position: {}, UUID: {}, 
ChunkId: {}, NumChunksFromMsg: {}",
+                                    subscription.getTopicName(), 
subscription.getName(), entryAndMetadata.getPosition(),
+                                    metadata.getUuid(), metadata.getChunkId(), 
metadata.getNumChunksFromMsg());
+                            // Directly ack the message.
+                            if (!(subscription instanceof 
PulsarCompactorSubscription)) {
+                                
subscription.acknowledgeMessage(Collections.singletonList(
+                                        entryAndMetadata.getPosition()), 
AckType.Individual, Collections.emptyMap());
+                                entryAndMetadata.release();
+                            }
+                        }

Review Comment:
   In this orphan-chunk branch we only call `entryAndMetadata.release()` when 
the subscription is *not* a `PulsarCompactorSubscription`. For compactor 
subscriptions we log and `continue` without passing the entry to 
`unassignedMessageProcessor` and without releasing it, so the underlying 
`Entry`/`ByteBuf` is leaked. To avoid leaking resources, the entry should be 
released regardless of subscription type (while still skipping the individual 
ack for `PulsarCompactorSubscription`), for example by moving the `release()` 
call outside the subscription-type check or adding a separate release in the 
compactor case.
   ```suggestion
                               }
                           }
                           entryAndMetadata.release();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to