Copilot commented on code in PR #25120:
URL: https://github.com/apache/pulsar/pull/25120#discussion_r2657434332


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata>
 
             if (metadata == null || !metadata.hasUuid() || 
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
                 consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entryAndMetadata);
+                availablePermits--;
             } else {
-                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer, availablePermits);
+                final String uuid = metadata.getUuid();
+                Consumer consumerForUuid = uuidToConsumer.get(uuid);
                 if (consumerForUuid == null) {
-                    unassignedMessageProcessor.accept(entryAndMetadata);
-                    continue;
+                    if (metadata.getChunkId() != 0) {
+                        if (subscription != null) {
+                            log.warn("[{}][{}] Skip the message because of it 
not the first chunk."
+                                            + " Position: {}, UUID: {}, 
ChunkId: {}, NumChunksFromMsg: {}",
+                                    subscription.getTopicName(), 
subscription.getName(), entryAndMetadata.getPosition(),
+                                    metadata.getUuid(), metadata.getChunkId(), 
metadata.getNumChunksFromMsg());
+                            // Directly ack the message
+                            if (!(subscription instanceof 
PulsarCompactorSubscription)) {

Review Comment:
   The error message uses double concatenation with a plus sign outside the 
format arguments, which is unconventional and harder to read. The message 
should be formatted as a single string template or the concatenation should be 
inside the curly braces if necessary for readability.
   ```suggestion
                               log.warn("[{}][{}] Skip the message because of 
it not the first chunk. Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: 
{}",
                                       subscription.getTopicName(), 
subscription.getName(), entryAndMetadata.getPosition(),
                                       metadata.getUuid(), 
metadata.getChunkId(), metadata.getNumChunksFromMsg());
                               // Directly ack the message
                               if (!(subscription instanceof 
PulsarCompactorSubscription)) {
                               if (!(subscription instanceof 
PulsarCompactorSubscription)) {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata>
 
             if (metadata == null || !metadata.hasUuid() || 
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
                 consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entryAndMetadata);
+                availablePermits--;
             } else {
-                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer, availablePermits);
+                final String uuid = metadata.getUuid();
+                Consumer consumerForUuid = uuidToConsumer.get(uuid);
                 if (consumerForUuid == null) {
-                    unassignedMessageProcessor.accept(entryAndMetadata);
-                    continue;
+                    if (metadata.getChunkId() != 0) {
+                        if (subscription != null) {
+                            log.warn("[{}][{}] Skip the message because of it 
not the first chunk."
+                                            + " Position: {}, UUID: {}, 
ChunkId: {}, NumChunksFromMsg: {}",
+                                    subscription.getTopicName(), 
subscription.getName(), entryAndMetadata.getPosition(),
+                                    metadata.getUuid(), metadata.getChunkId(), 
metadata.getNumChunksFromMsg());
+                            // Directly ack the message
+                            if (!(subscription instanceof 
PulsarCompactorSubscription)) {
+                                
subscription.acknowledgeMessage(Collections.singletonList(
+                                        entryAndMetadata.getPosition()), 
AckType.Individual, Collections.emptyMap());
+                                entryAndMetadata.release();
+                            }
+                        }

Review Comment:
   When subscription is null or the subscription is a 
PulsarCompactorSubscription, non-first chunks without a cached consumer are not 
acknowledged or released (the if block at lines 97-108 doesn't execute 
acknowledgment). However, the code still proceeds to cache the UUID mapping 
(line 111) and add the message to consumerToEntries (line 120). This means 
subsequent chunks with the same UUID will be delivered to this consumer, even 
though chunk0 was never received. This partially defeats the purpose of the 
fix, as the subscription will still receive incomplete chunked messages in 
these cases.
   ```suggestion
                                   entryAndMetadata.release();
                               } else {
                                   // For compactor subscriptions, just release 
the entry without ack
                                   entryAndMetadata.release();
                               }
                           } else {
                               // No subscription available, just release the 
entry
                               entryAndMetadata.release();
                           }
                           // Do not cache UUID mapping or deliver this 
non-first chunk
                           continue;
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java:
##########
@@ -58,7 +58,7 @@ public void prepareData() {
         roundRobinConsumerSelector.clear();
         entryAndMetadataList.clear();
         replayQueue.clear();
-        assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, 
replayQueue::add);
+        assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, 
replayQueue::add, null);

Review Comment:
   The test passes null for the subscription parameter, which means the new 
logic for handling missing chunk0 (lines 97-108 in SharedConsumerAssignor.java) 
is not covered by existing tests. Consider adding test cases that:
   1. Verify non-first chunks are properly acknowledged when chunk0 is missing
   2. Test the behavior when subscription is a PulsarCompactorSubscription
   3. Ensure that acknowledged chunks don't get dispatched to consumers



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata>
 
             if (metadata == null || !metadata.hasUuid() || 
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
                 consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entryAndMetadata);
+                availablePermits--;
             } else {
-                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer, availablePermits);
+                final String uuid = metadata.getUuid();
+                Consumer consumerForUuid = uuidToConsumer.get(uuid);
                 if (consumerForUuid == null) {
-                    unassignedMessageProcessor.accept(entryAndMetadata);
-                    continue;
+                    if (metadata.getChunkId() != 0) {
+                        if (subscription != null) {
+                            log.warn("[{}][{}] Skip the message because of it 
not the first chunk."
+                                            + " Position: {}, UUID: {}, 
ChunkId: {}, NumChunksFromMsg: {}",
+                                    subscription.getTopicName(), 
subscription.getName(), entryAndMetadata.getPosition(),
+                                    metadata.getUuid(), metadata.getChunkId(), 
metadata.getNumChunksFromMsg());
+                            // Directly ack the message
+                            if (!(subscription instanceof 
PulsarCompactorSubscription)) {
+                                
subscription.acknowledgeMessage(Collections.singletonList(
+                                        entryAndMetadata.getPosition()), 
AckType.Individual, Collections.emptyMap());
+                                entryAndMetadata.release();
+                            }
+                        }
+                    }
+                    consumerForUuid = consumer;
+                    uuidToConsumer.put(uuid, consumerForUuid);
+                }
+
+                final int permits = 
consumerToPermits.computeIfAbsent(consumerForUuid, 
Consumer::getAvailablePermits);
+                if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 
1) {
+                    // The last chunk is received, we should remove the uuid

Review Comment:
   The comment states "we should remove the uuid" but should clarify "we should 
remove the uuid from the cache" for better clarity.
   ```suggestion
                       // The last chunk is received, we should remove the uuid 
from the cache
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -20,18 +20,24 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
+import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 
 /**
  * The assigner to assign entries to the proper {@link Consumer} in the shared 
subscription.
  */
+

Review Comment:
   There is an unnecessary blank line between the Javadoc comment and the class 
annotations. The annotations should immediately follow the Javadoc without a 
blank line in between.
   ```suggestion
   
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata>
 
             if (metadata == null || !metadata.hasUuid() || 
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
                 consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entryAndMetadata);
+                availablePermits--;
             } else {
-                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer, availablePermits);
+                final String uuid = metadata.getUuid();
+                Consumer consumerForUuid = uuidToConsumer.get(uuid);
                 if (consumerForUuid == null) {
-                    unassignedMessageProcessor.accept(entryAndMetadata);
-                    continue;
+                    if (metadata.getChunkId() != 0) {
+                        if (subscription != null) {
+                            log.warn("[{}][{}] Skip the message because of it 
not the first chunk."

Review Comment:
   The error message contains a grammatical error. "because of it not the first 
chunk" should be "because it is not the first chunk".
   ```suggestion
                               log.warn("[{}][{}] Skip the message because it 
is not the first chunk."
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -80,15 +88,41 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata>
 
             if (metadata == null || !metadata.hasUuid() || 
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
                 consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entryAndMetadata);
+                availablePermits--;
             } else {
-                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer, availablePermits);
+                final String uuid = metadata.getUuid();
+                Consumer consumerForUuid = uuidToConsumer.get(uuid);
                 if (consumerForUuid == null) {
-                    unassignedMessageProcessor.accept(entryAndMetadata);
-                    continue;
+                    if (metadata.getChunkId() != 0) {
+                        if (subscription != null) {
+                            log.warn("[{}][{}] Skip the message because of it 
not the first chunk."
+                                            + " Position: {}, UUID: {}, 
ChunkId: {}, NumChunksFromMsg: {}",
+                                    subscription.getTopicName(), 
subscription.getName(), entryAndMetadata.getPosition(),
+                                    metadata.getUuid(), metadata.getChunkId(), 
metadata.getNumChunksFromMsg());
+                            // Directly ack the message
+                            if (!(subscription instanceof 
PulsarCompactorSubscription)) {
+                                
subscription.acknowledgeMessage(Collections.singletonList(
+                                        entryAndMetadata.getPosition()), 
AckType.Individual, Collections.emptyMap());
+                                entryAndMetadata.release();
+                            }
+                        }
+                    }
+                    consumerForUuid = consumer;
+                    uuidToConsumer.put(uuid, consumerForUuid);
+                }
+
+                final int permits = 
consumerToPermits.computeIfAbsent(consumerForUuid, 
Consumer::getAvailablePermits);
+                if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 
1) {
+                    // The last chunk is received, we should remove the uuid
+                    uuidToConsumer.remove(uuid);
                 }
+
                 consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new 
ArrayList<>()).add(entryAndMetadata);

Review Comment:
   When a non-first chunk is acknowledged and released (lines 104-106), the 
message should not continue to be processed. However, the code proceeds to add 
this already-released message to consumerToEntries (line 120) and cache its 
UUID mapping (line 111). This will cause the consumer to receive a message 
whose buffer has already been released, leading to potential reference counting 
errors or access to freed memory. After acknowledging and releasing the 
message, the code should continue to the next iteration of the loop without 
further processing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to