This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 559dfce2 fix: logs might be missed during RegionBased replay in the 
WAL based on local disk (#1570)
559dfce2 is described below

commit 559dfce20e40ed46577170efca02e5dd292e7cdf
Author: Draco <[email protected]>
AuthorDate: Fri Sep 20 14:19:21 2024 +0800

    fix: logs might be missed during RegionBased replay in the WAL based on 
local disk (#1570)
    
    ## Rationale
    In RegionBased replay, a batch of logs is first scanned from the WAL,
    and then replayed on various tables using multiple threads. This
    approach works fine for WALs based on tables, as the logs for each table
    are clustered together. However, in a WAL based on local disk, the logs
    for each table may be scattered across different positions within the
    batch. During multi-threaded replay, it is possible that for a given
    table, log2 is replayed before log1, resulting in missed logs.
    
    ## Detailed Changes
    1. Modify `split_log_batch_by_table` function to aggregate all logs for
    a table together.
    2. Modify `tableBatch` struct to change a single range into a
    `Vec<Range>`.
    
    ## Test Plan
    Manual testing.
---
 src/analytic_engine/src/instance/wal_replayer.rs | 36 ++++++++++++++++--------
 1 file changed, 24 insertions(+), 12 deletions(-)

diff --git a/src/analytic_engine/src/instance/wal_replayer.rs 
b/src/analytic_engine/src/instance/wal_replayer.rs
index 251797bf..f7828951 100644
--- a/src/analytic_engine/src/instance/wal_replayer.rs
+++ b/src/analytic_engine/src/instance/wal_replayer.rs
@@ -395,6 +395,11 @@ impl RegionBasedReplay {
             if failed_tables.contains_key(&table_batch.table_id) {
                 continue;
             }
+            let log_entries: Vec<_> = table_batch
+                .ranges
+                .iter()
+                .flat_map(|range| log_batch.range(range.clone()))
+                .collect();
 
             let serial_exec_ctxs = serial_exec_ctxs.clone();
             replay_tasks.push(async move {
@@ -405,7 +410,7 @@ impl RegionBasedReplay {
                         context.max_retry_flush_limit,
                         &mut ctx.serial_exec,
                         &ctx.table_data,
-                        log_batch.range(table_batch.range),
+                        log_entries.into_iter(),
                     )
                     .await;
                     (table_batch.table_id, Some(result))
@@ -439,13 +444,14 @@ impl RegionBasedReplay {
 
         // Split log batch by table id, for example:
         // input batch:
-        //  |1|1|2|2|2|3|3|3|3|
+        //  |1|1|2|2|2|3|3|3|3|1|1|
         //
         // output batches:
-        //  |1|1|, |2|2|2|, |3|3|3|3|
+        //  |1|1|1|1|, |2|2|2|, |3|3|3|3|
         let mut start_log_idx = 0usize;
         let mut curr_log_idx = 0usize;
         let mut start_table_id = 
log_batch.get(start_log_idx).unwrap().table_id;
+        let mut table_ranges = HashMap::new();
         loop {
             let time_to_break = curr_log_idx == log_batch.len();
             let found_end_idx = if time_to_break {
@@ -456,10 +462,10 @@ impl RegionBasedReplay {
             };
 
             if found_end_idx {
-                table_batches.push(TableBatch {
-                    table_id: TableId::new(start_table_id),
-                    range: start_log_idx..curr_log_idx,
-                });
+                table_ranges
+                    .entry(TableId::new(start_table_id))
+                    .or_insert(Vec::new())
+                    .push(start_log_idx..curr_log_idx);
 
                 // Step to next start idx.
                 start_log_idx = curr_log_idx;
@@ -476,13 +482,16 @@ impl RegionBasedReplay {
             }
             curr_log_idx += 1;
         }
+        for (table_id, ranges) in table_ranges {
+            table_batches.push(TableBatch { table_id, ranges });
+        }
     }
 }
 
 #[derive(Debug, Eq, PartialEq)]
 struct TableBatch {
     table_id: TableId,
-    range: Range<usize>,
+    ranges: Vec<Range<usize>>,
 }
 
 struct SerialExecContext<'a> {
@@ -622,6 +631,7 @@ mod tests {
         }
     }
 
+    #[allow(clippy::single_range_in_vec_init)]
     fn test_set() -> Vec<(VecDeque<LogEntry<u32>>, Vec<TableBatch>)> {
         let test_log_batch1: VecDeque<LogEntry<u32>> = VecDeque::from([
             LogEntry {
@@ -658,15 +668,15 @@ mod tests {
         let expected1 = vec![
             TableBatch {
                 table_id: TableId::new(0),
-                range: 0..3,
+                ranges: vec![0..3],
             },
             TableBatch {
                 table_id: TableId::new(1),
-                range: 3..5,
+                ranges: vec![3..5],
             },
             TableBatch {
                 table_id: TableId::new(2),
-                range: 5..6,
+                ranges: vec![5..6],
             },
         ];
 
@@ -677,7 +687,7 @@ mod tests {
         }]);
         let expected2 = vec![TableBatch {
             table_id: TableId::new(0),
-            range: 0..1,
+            ranges: vec![0..1],
         }];
 
         let test_log_batch3: VecDeque<LogEntry<u32>> = VecDeque::default();
@@ -693,6 +703,8 @@ mod tests {
     fn check_split_result(batch: &VecDeque<LogEntry<u32>>, expected: 
&[TableBatch]) {
         let mut table_batches = Vec::new();
         RegionBasedReplay::split_log_batch_by_table(batch, &mut table_batches);
+        // split_log_batch_by_table returns unordered results, so sort it here.
+        table_batches.sort_by_key(|tb| tb.table_id);
         assert_eq!(&table_batches, expected);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to