This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 559dfce2 fix: logs might be missed during RegionBased replay in the
WAL based on local disk (#1570)
559dfce2 is described below
commit 559dfce20e40ed46577170efca02e5dd292e7cdf
Author: Draco <[email protected]>
AuthorDate: Fri Sep 20 14:19:21 2024 +0800
fix: logs might be missed during RegionBased replay in the WAL based on
local disk (#1570)
## Rationale
In RegionBased replay, a batch of logs is first scanned from the WAL,
and then replayed on various tables using multiple threads. This
approach works fine for WALs based on tables, as the logs for each table
are clustered together. However, in a WAL based on local disk, the logs
for each table may be scattered across different positions within the
batch. During multi-threaded replay, it is possible that for a given
table, log2 is replayed before log1, resulting in missed logs.
## Detailed Changes
1. Modify `split_log_batch_by_table` function to aggregate all logs for
a table together.
2. Modify `tableBatch` struct to change a single range into a
`Vec<Range>`.
## Test Plan
Manual testing.
---
src/analytic_engine/src/instance/wal_replayer.rs | 36 ++++++++++++++++--------
1 file changed, 24 insertions(+), 12 deletions(-)
diff --git a/src/analytic_engine/src/instance/wal_replayer.rs
b/src/analytic_engine/src/instance/wal_replayer.rs
index 251797bf..f7828951 100644
--- a/src/analytic_engine/src/instance/wal_replayer.rs
+++ b/src/analytic_engine/src/instance/wal_replayer.rs
@@ -395,6 +395,11 @@ impl RegionBasedReplay {
if failed_tables.contains_key(&table_batch.table_id) {
continue;
}
+ let log_entries: Vec<_> = table_batch
+ .ranges
+ .iter()
+ .flat_map(|range| log_batch.range(range.clone()))
+ .collect();
let serial_exec_ctxs = serial_exec_ctxs.clone();
replay_tasks.push(async move {
@@ -405,7 +410,7 @@ impl RegionBasedReplay {
context.max_retry_flush_limit,
&mut ctx.serial_exec,
&ctx.table_data,
- log_batch.range(table_batch.range),
+ log_entries.into_iter(),
)
.await;
(table_batch.table_id, Some(result))
@@ -439,13 +444,14 @@ impl RegionBasedReplay {
// Split log batch by table id, for example:
// input batch:
- // |1|1|2|2|2|3|3|3|3|
+ // |1|1|2|2|2|3|3|3|3|1|1|
//
// output batches:
- // |1|1|, |2|2|2|, |3|3|3|3|
+ // |1|1|1|1|, |2|2|2|, |3|3|3|3|
let mut start_log_idx = 0usize;
let mut curr_log_idx = 0usize;
let mut start_table_id =
log_batch.get(start_log_idx).unwrap().table_id;
+ let mut table_ranges = HashMap::new();
loop {
let time_to_break = curr_log_idx == log_batch.len();
let found_end_idx = if time_to_break {
@@ -456,10 +462,10 @@ impl RegionBasedReplay {
};
if found_end_idx {
- table_batches.push(TableBatch {
- table_id: TableId::new(start_table_id),
- range: start_log_idx..curr_log_idx,
- });
+ table_ranges
+ .entry(TableId::new(start_table_id))
+ .or_insert(Vec::new())
+ .push(start_log_idx..curr_log_idx);
// Step to next start idx.
start_log_idx = curr_log_idx;
@@ -476,13 +482,16 @@ impl RegionBasedReplay {
}
curr_log_idx += 1;
}
+ for (table_id, ranges) in table_ranges {
+ table_batches.push(TableBatch { table_id, ranges });
+ }
}
}
#[derive(Debug, Eq, PartialEq)]
struct TableBatch {
table_id: TableId,
- range: Range<usize>,
+ ranges: Vec<Range<usize>>,
}
struct SerialExecContext<'a> {
@@ -622,6 +631,7 @@ mod tests {
}
}
+ #[allow(clippy::single_range_in_vec_init)]
fn test_set() -> Vec<(VecDeque<LogEntry<u32>>, Vec<TableBatch>)> {
let test_log_batch1: VecDeque<LogEntry<u32>> = VecDeque::from([
LogEntry {
@@ -658,15 +668,15 @@ mod tests {
let expected1 = vec![
TableBatch {
table_id: TableId::new(0),
- range: 0..3,
+ ranges: vec![0..3],
},
TableBatch {
table_id: TableId::new(1),
- range: 3..5,
+ ranges: vec![3..5],
},
TableBatch {
table_id: TableId::new(2),
- range: 5..6,
+ ranges: vec![5..6],
},
];
@@ -677,7 +687,7 @@ mod tests {
}]);
let expected2 = vec![TableBatch {
table_id: TableId::new(0),
- range: 0..1,
+ ranges: vec![0..1],
}];
let test_log_batch3: VecDeque<LogEntry<u32>> = VecDeque::default();
@@ -693,6 +703,8 @@ mod tests {
fn check_split_result(batch: &VecDeque<LogEntry<u32>>, expected:
&[TableBatch]) {
let mut table_batches = Vec::new();
RegionBasedReplay::split_log_batch_by_table(batch, &mut table_batches);
+ // split_log_batch_by_table returns unordered results, so sort it here.
+ table_batches.sort_by_key(|tb| tb.table_id);
assert_eq!(&table_batches, expected);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]