jiacai2050 commented on code in PR #1572:
URL: https://github.com/apache/horaedb/pull/1572#discussion_r1777943713


##########
src/analytic_engine/src/instance/wal_replayer.rs:
##########
@@ -381,49 +381,53 @@ impl RegionBasedReplay {
     async fn replay_single_batch(
         context: &ReplayContext,
         log_batch: &VecDeque<LogEntry<ReadPayload>>,
-        serial_exec_ctxs: &Arc<Mutex<HashMap<TableId, SerialExecContext<'_>>>>,
+        serial_exec_ctxs: &HashMap<TableId, Mutex<SerialExecContext<'_>>>,
         failed_tables: &mut FailedTables,
     ) -> Result<()> {
         let mut table_batches = Vec::new();
         // TODO: No `group_by` method in `VecDeque`, so implement it manually 
here...
         Self::split_log_batch_by_table(log_batch, &mut table_batches);
 
-        // TODO: Replay logs of different tables in parallel.
-        let mut replay_tasks = Vec::with_capacity(table_batches.len());
-        for table_batch in table_batches {
-            // Some tables may have failed in previous replay, ignore them.
-            if failed_tables.contains_key(&table_batch.table_id) {
-                continue;
-            }
-            let log_entries: Vec<_> = table_batch
-                .ranges
-                .iter()
-                .flat_map(|range| log_batch.range(range.clone()))
-                .collect();
-
-            let serial_exec_ctxs = serial_exec_ctxs.clone();
-            replay_tasks.push(async move {
-                // Some tables may have been moved to other shards or dropped, 
ignore such logs.
-                if let Some(ctx) = 
serial_exec_ctxs.lock().await.get_mut(&table_batch.table_id) {
-                    let result = replay_table_log_entries(
-                        &context.flusher,
-                        context.max_retry_flush_limit,
-                        &mut ctx.serial_exec,
-                        &ctx.table_data,
-                        log_entries.into_iter(),
-                    )
-                    .await;
-                    (table_batch.table_id, Some(result))
-                } else {
-                    (table_batch.table_id, None)
+        let ((), results) = async_scoped::TokioScope::scope_and_block(|scope| {
+            // Run at most 20 tasks in parallel
+            let semaphore = Arc::new(Semaphore::new(20));

Review Comment:
   Define 20 as a const value, so it can be reused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to