zhaohai666 commented on code in PR #1211:
URL: https://github.com/apache/rocketmq-clients/pull/1211#discussion_r3008107971
##########
nodejs/src/consumer/FifoConsumeService.ts:
##########
@@ -21,12 +21,105 @@ import { MessageListener } from './MessageListener';
import type { ProcessQueue } from './ProcessQueue';
export class FifoConsumeService extends ConsumeService {
- constructor(clientId: string, messageListener: MessageListener) {
+ readonly #enableAccelerator: boolean;
+ readonly #groupProcessing = new Map<string /* messageGroup */,
Promise<void>>();
+
+ constructor(clientId: string, messageListener: MessageListener,
enableAccelerator?: boolean) {
super(clientId, messageListener);
+ this.#enableAccelerator = enableAccelerator ?? false;
}
consume(pq: ProcessQueue, messageViews: MessageView[]): void {
- this.#consumeIteratively(pq, messageViews, 0);
+ // Use iterative consumption when accelerator is disabled or only one
message
+ if (!this.#enableAccelerator || messageViews.length <= 1) {
+ this.#consumeIteratively(pq, messageViews, 0);
+ return;
+ }
+ this.#consumeWithAccelerator(pq, messageViews);
+ }
+
+ /**
+ * FIFO consume accelerator mode:
+ * - Messages with the same messageGroup are consumed sequentially
+ * - Messages with different messageGroups are consumed in parallel
+ * - Messages without messageGroup are consumed in parallel
+ */
+ #consumeWithAccelerator(pq: ProcessQueue, messageViews: MessageView[]): void
{
+ // Group messages by messageGroup
+ const groupedMessages = new Map<string, MessageView[]>();
+ const ungroupedMessages: MessageView[] = [];
+
+ for (const messageView of messageViews) {
+ if (messageView.corrupted) {
+ pq.discardFifoMessage(messageView);
+ continue;
+ }
+
+ const messageGroup = messageView.messageGroup || '';
+ if (messageGroup) {
+ const group = groupedMessages.get(messageGroup) || [];
+ group.push(messageView);
+ groupedMessages.set(messageGroup, group);
+ } else {
+ ungroupedMessages.push(messageView);
+ }
+ }
+
+ // Log parallel consumption info
+ const groupCount = groupedMessages.size + (ungroupedMessages.length > 0 ?
1 : 0);
+ console.debug('FifoConsumeService parallel consume, messageViewsNum=%d,
groupNum=%d',
+ messageViews.length, groupCount);
+
+ // Process grouped messages (each group sequentially, groups in parallel)
+ for (const [ , messages ] of groupedMessages.entries()) {
+ this.#processMessageGroup(pq, messages);
+ }
+
+ // Process ungrouped messages in parallel
+ for (const messageView of ungroupedMessages) {
+ this.consumeMessage(messageView)
+ .then(result => pq.eraseFifoMessage(messageView, result))
+ .catch(() => {
+ // Error already logged, continue with next message
+ });
+ }
+ }
+
+ /**
+ * Process messages within the same group sequentially
+ */
+ async #processMessageGroup(pq: ProcessQueue, messages: MessageView[]):
Promise<void> {
+ // Check if there's already processing happening for this group
+ const messageGroup = messages[0]?.messageGroup || 'NO_GROUP';
+ const existingPromise = this.#groupProcessing.get(messageGroup);
+ const processTask = (async () => {
+ // Wait for previous processing to complete if any
+ if (existingPromise) {
+ await existingPromise.catch(() => {
+ // Ignore previous errors, continue processing
+ });
+ }
+
+ // Process messages in sequence
+ for (const messageView of messages) {
+ try {
+ const result = await this.consumeMessage(messageView);
+ await pq.eraseFifoMessage(messageView, result);
Review Comment:
已修复
##########
nodejs/src/producer/Producer.ts:
##########
@@ -333,4 +336,53 @@ export class Producer extends BaseClient {
#getRetryPolicy() {
return this.#publishingSettings.getRetryPolicy()!;
}
+
+ /**
+ * Recalls a scheduled/delayed message based on the topic and recall handle.
+ * This operation requires server support and can only be performed before
the message is delivered.
+ *
+ * @param topic - The topic associated with the scheduled message to be
canceled.
+ * @param recallHandle - A unique handle to identify the message to recall
(obtained from SendReceipt).
+ * @returns Promise resolving to RecallReceipt containing the recalled
message ID.
+ * @throws Error if producer is not running or recall handle is invalid.
+ */
+ async recallMessage(topic: string, recallHandle: string):
Promise<RecallReceipt> {
+ if (!this.isRunning()) {
+ this.logger.error('Unable to recall message because producer is not
running, clientId=%s', this.clientId);
+ throw new Error('Producer is not running now');
+ }
+
Review Comment:
已修复
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]