zk-drizzle commented on code in PR #1211:
URL: https://github.com/apache/rocketmq-clients/pull/1211#discussion_r3007801560
##########
nodejs/src/consumer/FifoConsumeService.ts:
##########
@@ -21,12 +21,105 @@ import { MessageListener } from './MessageListener';
import type { ProcessQueue } from './ProcessQueue';
export class FifoConsumeService extends ConsumeService {
- constructor(clientId: string, messageListener: MessageListener) {
+ readonly #enableAccelerator: boolean;
+ readonly #groupProcessing = new Map<string /* messageGroup */,
Promise<void>>();
+
+ constructor(clientId: string, messageListener: MessageListener,
enableAccelerator?: boolean) {
super(clientId, messageListener);
+ this.#enableAccelerator = enableAccelerator ?? false;
}
consume(pq: ProcessQueue, messageViews: MessageView[]): void {
- this.#consumeIteratively(pq, messageViews, 0);
+ // Use iterative consumption when accelerator is disabled or only one
message
+ if (!this.#enableAccelerator || messageViews.length <= 1) {
+ this.#consumeIteratively(pq, messageViews, 0);
+ return;
+ }
+ this.#consumeWithAccelerator(pq, messageViews);
+ }
+
+ /**
+ * FIFO consume accelerator mode:
+ * - Messages with the same messageGroup are consumed sequentially
+ * - Messages with different messageGroups are consumed in parallel
+ * - Messages without messageGroup are consumed in parallel
+ */
+ #consumeWithAccelerator(pq: ProcessQueue, messageViews: MessageView[]): void
{
+ // Group messages by messageGroup
+ const groupedMessages = new Map<string, MessageView[]>();
+ const ungroupedMessages: MessageView[] = [];
+
+ for (const messageView of messageViews) {
+ if (messageView.corrupted) {
+ pq.discardFifoMessage(messageView);
+ continue;
+ }
+
+ const messageGroup = messageView.messageGroup || '';
+ if (messageGroup) {
+ const group = groupedMessages.get(messageGroup) || [];
+ group.push(messageView);
+ groupedMessages.set(messageGroup, group);
+ } else {
+ ungroupedMessages.push(messageView);
+ }
+ }
+
+ // Log parallel consumption info
+ const groupCount = groupedMessages.size + (ungroupedMessages.length > 0 ?
1 : 0);
+ console.debug('FifoConsumeService parallel consume, messageViewsNum=%d,
groupNum=%d',
+ messageViews.length, groupCount);
+
+ // Process grouped messages (each group sequentially, groups in parallel)
+ for (const [ , messages ] of groupedMessages.entries()) {
+ this.#processMessageGroup(pq, messages);
+ }
+
+ // Process ungrouped messages in parallel
+ for (const messageView of ungroupedMessages) {
+ this.consumeMessage(messageView)
+ .then(result => pq.eraseFifoMessage(messageView, result))
+ .catch(() => {
+ // Error already logged, continue with next message
+ });
+ }
+ }
+
+ /**
+ * Process messages within the same group sequentially
+ */
+ async #processMessageGroup(pq: ProcessQueue, messages: MessageView[]):
Promise<void> {
+ // Check if there's already processing happening for this group
+ const messageGroup = messages[0]?.messageGroup || 'NO_GROUP';
+ const existingPromise = this.#groupProcessing.get(messageGroup);
+ const processTask = (async () => {
+ // Wait for previous processing to complete if any
+ if (existingPromise) {
+ await existingPromise.catch(() => {
+ // Ignore previous errors, continue processing
+ });
+ }
+
+ // Process messages in sequence
+ for (const messageView of messages) {
+ try {
+ const result = await this.consumeMessage(messageView);
+ await pq.eraseFifoMessage(messageView, result);
Review Comment:
Avoid using \console` for logging.`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]