lhotari commented on code in PR #25120:
URL: https://github.com/apache/pulsar/pull/25120#discussion_r2765240365
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -106,6 +105,64 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata>
return consumerToEntries;
}
+ private int assignChunk(EntryAndMetadata entryAndMetadata, MessageMetadata
metadata, Consumer consumer,
+ Map<Consumer, List<EntryAndMetadata>> consumerToEntries, int
availablePermits) {
+ final String uuid = metadata.getUuid();
+ Consumer consumerForUuid = uuidToConsumer.get(uuid);
+ if (consumerForUuid == null) {
+ if (skipChunk(entryAndMetadata, metadata)) {
+ return availablePermits;
+ }
+ consumerForUuid = consumer;
+ uuidToConsumer.put(uuid, consumerForUuid);
+ }
+
+ final int permits = consumerToPermits.computeIfAbsent(consumerForUuid,
Consumer::getAvailablePermits);
+ if (permits <= 0) {
+ unassignedMessageProcessor.accept(entryAndMetadata);
+ return availablePermits;
+ }
+ if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
+ // The last chunk is received, we should remove the uuid from the
cache.
+ uuidToConsumer.remove(uuid);
+ }
+
+ addEntry(consumerToEntries, consumerForUuid, entryAndMetadata);
+ consumerToPermits.put(consumerForUuid, permits - 1);
+ if (consumerForUuid == consumer) {
+ return availablePermits - 1;
+ }
+ return availablePermits;
+ }
+
+ private boolean isChunkedMessage(MessageMetadata metadata) {
+ return metadata != null && metadata.hasUuid() && metadata.hasChunkId()
&& metadata.hasNumChunksFromMsg();
+ }
+
+ private void addEntry(Map<Consumer, List<EntryAndMetadata>>
consumerToEntries, Consumer consumer,
+ EntryAndMetadata entry) {
+ consumerToEntries.computeIfAbsent(consumer, __ -> new
ArrayList<>()).add(entry);
+ }
+
+ private boolean skipChunk(EntryAndMetadata entryAndMetadata,
MessageMetadata metadata) {
+ if (metadata.getChunkId() != 0) {
+ if (subscription != null) {
+ log.warn("[{}][{}] Skip the message because it is not the
first chunk."
+ + " Position: {}, UUID: {}, ChunkId: {},
NumChunksFromMsg: {}",
+ subscription.getTopicName(), subscription.getName(),
entryAndMetadata.getPosition(),
+ metadata.getUuid(), metadata.getChunkId(),
metadata.getNumChunksFromMsg());
+ // Directly ack the message.
+ if (!(subscription instanceof PulsarCompactorSubscription)) {
+ subscription.acknowledgeMessage(Collections.singletonList(
+ entryAndMetadata.getPosition()),
AckType.Individual, Collections.emptyMap());
+ }
+ }
+ entryAndMetadata.release();
+ return true;
Review Comment:
Since there's a potential for data loss, acknowledging the messages should
be active only if `autoSkipNonRecoverableData` is set in `broker.conf`. The log
message should be logged with `ERROR` level when `autoSkipNonRecoverableData`
isn't set and the message shouldn't get acknowledged. I think it's fine to skip
the message in that case so that processing the subscription continues, but
there will be a backlog left behind due to the unacked messages.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -106,6 +105,64 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata>
return consumerToEntries;
}
+ private int assignChunk(EntryAndMetadata entryAndMetadata, MessageMetadata
metadata, Consumer consumer,
+ Map<Consumer, List<EntryAndMetadata>> consumerToEntries, int
availablePermits) {
+ final String uuid = metadata.getUuid();
+ Consumer consumerForUuid = uuidToConsumer.get(uuid);
+ if (consumerForUuid == null) {
+ if (skipChunk(entryAndMetadata, metadata)) {
+ return availablePermits;
+ }
+ consumerForUuid = consumer;
+ uuidToConsumer.put(uuid, consumerForUuid);
+ }
+
+ final int permits = consumerToPermits.computeIfAbsent(consumerForUuid,
Consumer::getAvailablePermits);
+ if (permits <= 0) {
+ unassignedMessageProcessor.accept(entryAndMetadata);
+ return availablePermits;
+ }
+ if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
+ // The last chunk is received, we should remove the uuid from the
cache.
+ uuidToConsumer.remove(uuid);
+ }
+
+ addEntry(consumerToEntries, consumerForUuid, entryAndMetadata);
+ consumerToPermits.put(consumerForUuid, permits - 1);
+ if (consumerForUuid == consumer) {
+ return availablePermits - 1;
+ }
+ return availablePermits;
+ }
+
+ private boolean isChunkedMessage(MessageMetadata metadata) {
+ return metadata != null && metadata.hasUuid() && metadata.hasChunkId()
&& metadata.hasNumChunksFromMsg();
+ }
+
+ private void addEntry(Map<Consumer, List<EntryAndMetadata>>
consumerToEntries, Consumer consumer,
+ EntryAndMetadata entry) {
+ consumerToEntries.computeIfAbsent(consumer, __ -> new
ArrayList<>()).add(entry);
+ }
+
+ private boolean skipChunk(EntryAndMetadata entryAndMetadata,
MessageMetadata metadata) {
Review Comment:
`skipChunk` sounds like a command, it's better to rename it to make it sound
like a query.
```suggestion
private boolean shouldSkipChunk(EntryAndMetadata entryAndMetadata,
MessageMetadata metadata) {
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java:
##########
@@ -106,6 +105,64 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata>
return consumerToEntries;
}
+ private int assignChunk(EntryAndMetadata entryAndMetadata, MessageMetadata
metadata, Consumer consumer,
+ Map<Consumer, List<EntryAndMetadata>> consumerToEntries, int
availablePermits) {
+ final String uuid = metadata.getUuid();
+ Consumer consumerForUuid = uuidToConsumer.get(uuid);
+ if (consumerForUuid == null) {
+ if (skipChunk(entryAndMetadata, metadata)) {
+ return availablePermits;
+ }
+ consumerForUuid = consumer;
+ uuidToConsumer.put(uuid, consumerForUuid);
+ }
Review Comment:
It seems that this solution would only skip the first entry of possibly
multiple chunk entries.
Let's say if entry with chunkId 0 got lost and there would be subsequent
entries chunkId 1, chunkId 2 and chunkId 3. The entries with chunkId 2 and
chunkId 3 would get delivered to the client, causing a similar issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]