jiacai2050 commented on code in PR #1492:
URL: 
https://github.com/apache/incubator-horaedb/pull/1492#discussion_r1517292652


##########
src/analytic_engine/src/instance/wal_replayer.rs:
##########
@@ -374,31 +376,54 @@ impl RegionBasedReplay {
         // TODO: No `group_by` method in `VecDeque`, so implement it manually 
here...
         Self::split_log_batch_by_table(log_batch, &mut table_batches);
 
-        // TODO: Replay logs of different tables in parallel.
-        for table_batch in table_batches {
-            // Some tables may have failed in previous replay, ignore them.
-            if failed_tables.contains_key(&table_batch.table_id) {
-                continue;
-            }
-
-            // Replay all log entries of current table.
-            // Some tables may have been moved to other shards or dropped, 
ignore such logs.
-            if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) 
{
-                let result = replay_table_log_entries(
-                    &context.flusher,
-                    context.max_retry_flush_limit,
-                    &mut ctx.serial_exec,
-                    &ctx.table_data,
-                    log_batch.range(table_batch.range),
-                )
-                .await;
-
-                // If occur error, mark this table as failed and store the 
cause.
-                if let Err(e) = result {
-                    failed_tables.insert(table_batch.table_id, e);
+        let alter_failed_tables = HashMap::new();
+        let alter_failed_tables_ref = 
Arc::new(Mutex::new(alter_failed_tables));
+
+        let mut serial_exec_ctxs_dash_map = DashMap::new();

Review Comment:
   This map seems unnecessary, what I think of is like this:
   ```diff
   modified   src/analytic_engine/src/instance/wal_replayer.rs
   @@ -29,6 +29,7 @@ use common_types::{
        schema::{IndexInWriterSchema, Schema},
        table::ShardId,
    };
   +use futures::StreamExt;
    use generic_error::BoxError;
    use lazy_static::lazy_static;
    use logger::{debug, error, info, trace, warn};
   @@ -374,6 +375,7 @@ impl RegionBasedReplay {
            // TODO: No `group_by` method in `VecDeque`, so implement it 
manually here...
            Self::split_log_batch_by_table(log_batch, &mut table_batches);
    
   +        let mut replay_tasks = Vec::with_capacity(table_batches.len());
            // TODO: Replay logs of different tables in parallel.
            for table_batch in table_batches {
                // Some tables may have failed in previous replay, ignore them.
   @@ -384,22 +386,27 @@ impl RegionBasedReplay {
                // Replay all log entries of current table.
                // Some tables may have been moved to other shards or dropped, 
ignore such logs.
                if let Some(ctx) = 
serial_exec_ctxs.get_mut(&table_batch.table_id) {
   -                let result = replay_table_log_entries(
   +                replay_tasks.push(replay_table_log_entries(
                        &context.flusher,
                        context.max_retry_flush_limit,
                        &mut ctx.serial_exec,
                        &ctx.table_data,
                        log_batch.range(table_batch.range),
   -                )
   -                .await;
   +                ));
    
   -                // If occur error, mark this table as failed and store the 
cause.
   -                if let Err(e) = result {
   -                    failed_tables.insert(table_batch.table_id, e);
   -                }
   +                // If occur error, mark this table as failed and store the
   +                // cause. if let Err(e) = result {
   +                //     failed_tables.insert(table_batch.table_id, e);
   +                // }
                }
            }
   -
   +        for ret in futures::stream::iter(replay_tasks)
   +            .buffer_unordered(20)
   +            .collect::<Vec<_>>()
   +            .await
   +        {
   +            // insert to failed_tables in there are errors
   +        }
            Ok(())
        }
   
   ```
   
   But this compile failed due to mutable reference
   
   ```
   error[E0499]: cannot borrow `*serial_exec_ctxs` as mutable more than once at 
a time
      --> src/analytic_engine/src/instance/wal_replayer.rs:388:32
       |
   388 |             if let Some(ctx) = 
serial_exec_ctxs.get_mut(&table_batch.table_id) {
       |                                ^^^^^^^^^^^^^^^^ `*serial_exec_ctxs` 
was mutably borrowed here in the previous iteration of the loop
   ...
   403 |         for ret in futures::stream::iter(replay_tasks)
       |                                          ------------ first borrow 
used here, in later iteration of loop
   
   
   ```
   
   So the first step to do this task is to remove those mutable references. 
   
   The fix should be easy, just define serial_exec_ctxs with 
`Arc<Mutex<HashMap>>` type.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to