This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 66d7a0d3 feat: replay logs of different tables in parallel (#1492)
66d7a0d3 is described below
commit 66d7a0d3d1a59743ccefbb248165c28b78cdf0bb
Author: Lethannn <[email protected]>
AuthorDate: Mon Mar 11 11:43:28 2024 +0800
feat: replay logs of different tables in parallel (#1492)
## Rationale
Related with https://github.com/apache/incubator-horaedb/issues/1466
## Detailed Changes
Replay logs of different tables in parallel
## Test Plan
CI
---------
Co-authored-by: jiacai2050 <[email protected]>
---
Cargo.lock | 35 ++++++++++------
src/analytic_engine/src/instance/wal_replayer.rs | 53 ++++++++++++++----------
2 files changed, 52 insertions(+), 36 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index d7fad18c..fca1cab2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1972,15 +1972,15 @@ dependencies = [
[[package]]
name = "dashmap"
-version = "5.4.0"
+version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
+checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if 1.0.0",
- "hashbrown 0.12.3",
+ "hashbrown 0.14.0",
"lock_api",
"once_cell",
- "parking_lot_core 0.9.7",
+ "parking_lot_core 0.9.9",
]
[[package]]
@@ -3750,9 +3750,9 @@ dependencies = [
[[package]]
name = "lock_api"
-version = "0.4.9"
+version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
+checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45"
dependencies = [
"autocfg",
"scopeguard",
@@ -4551,7 +4551,7 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
dependencies = [
- "parking_lot_core 0.9.7",
+ "parking_lot_core 0.9.9",
]
[[package]]
@@ -4626,7 +4626,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
- "parking_lot_core 0.9.7",
+ "parking_lot_core 0.9.9",
]
[[package]]
@@ -4645,15 +4645,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
-version = "0.9.7"
+version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
+checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if 1.0.0",
"libc",
- "redox_syscall 0.2.16",
+ "redox_syscall 0.4.1",
"smallvec",
- "windows-sys 0.45.0",
+ "windows-targets 0.48.1",
]
[[package]]
@@ -5724,6 +5724,15 @@ dependencies = [
"bitflags 1.3.2",
]
+[[package]]
+name = "redox_syscall"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
+dependencies = [
+ "bitflags 1.3.2",
+]
+
[[package]]
name = "redox_users"
version = "0.4.3"
@@ -8384,7 +8393,7 @@ dependencies = [
"fail",
"lazy_static",
"num_cpus",
- "parking_lot_core 0.9.7",
+ "parking_lot_core 0.9.9",
"prometheus 0.13.3",
"rand 0.8.5",
]
diff --git a/src/analytic_engine/src/instance/wal_replayer.rs
b/src/analytic_engine/src/instance/wal_replayer.rs
index 0aea4f2e..78792541 100644
--- a/src/analytic_engine/src/instance/wal_replayer.rs
+++ b/src/analytic_engine/src/instance/wal_replayer.rs
@@ -29,13 +29,14 @@ use common_types::{
schema::{IndexInWriterSchema, Schema},
table::ShardId,
};
+use futures::StreamExt;
use generic_error::BoxError;
use lazy_static::lazy_static;
use logger::{debug, error, info, trace, warn};
use prometheus::{exponential_buckets, register_histogram, Histogram};
use snafu::ResultExt;
use table_engine::table::TableId;
-use tokio::sync::MutexGuard;
+use tokio::sync::{Mutex, MutexGuard};
use wal::{
log_batch::LogEntry,
manager::{
@@ -335,6 +336,7 @@ impl RegionBasedReplay {
let schema_provider = TableSchemaProviderAdapter {
table_datas: table_datas_by_id.clone(),
};
+ let serial_exec_ctxs = Arc::new(Mutex::new(serial_exec_ctxs));
// Split and replay logs.
loop {
let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
@@ -352,13 +354,8 @@ impl RegionBasedReplay {
}
let _timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
- Self::replay_single_batch(
- context,
- &log_entry_buf,
- &mut serial_exec_ctxs,
- failed_tables,
- )
- .await?;
+ Self::replay_single_batch(context, &log_entry_buf,
&serial_exec_ctxs, failed_tables)
+ .await?;
}
Ok(())
@@ -367,7 +364,7 @@ impl RegionBasedReplay {
async fn replay_single_batch(
context: &ReplayContext,
log_batch: &VecDeque<LogEntry<ReadPayload>>,
- serial_exec_ctxs: &mut HashMap<TableId, SerialExecContext<'_>>,
+ serial_exec_ctxs: &Arc<Mutex<HashMap<TableId, SerialExecContext<'_>>>>,
failed_tables: &mut FailedTables,
) -> Result<()> {
let mut table_batches = Vec::new();
@@ -375,28 +372,38 @@ impl RegionBasedReplay {
Self::split_log_batch_by_table(log_batch, &mut table_batches);
// TODO: Replay logs of different tables in parallel.
+ let mut replay_tasks = Vec::with_capacity(table_batches.len());
for table_batch in table_batches {
// Some tables may have failed in previous replay, ignore them.
if failed_tables.contains_key(&table_batch.table_id) {
continue;
}
- // Replay all log entries of current table.
- // Some tables may have been moved to other shards or dropped,
ignore such logs.
- if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id)
{
- let result = replay_table_log_entries(
- &context.flusher,
- context.max_retry_flush_limit,
- &mut ctx.serial_exec,
- &ctx.table_data,
- log_batch.range(table_batch.range),
- )
- .await;
+ let serial_exec_ctxs = serial_exec_ctxs.clone();
+ replay_tasks.push(async move {
+ // Some tables may have been moved to other shards or dropped,
ignore such logs.
+ if let Some(ctx) =
serial_exec_ctxs.lock().await.get_mut(&table_batch.table_id) {
+ let result = replay_table_log_entries(
+ &context.flusher,
+ context.max_retry_flush_limit,
+ &mut ctx.serial_exec,
+ &ctx.table_data,
+ log_batch.range(table_batch.range),
+ )
+ .await;
+ (table_batch.table_id, Some(result))
+ } else {
+ (table_batch.table_id, None)
+ }
+ });
+ }
+ // Run at most 20 tasks in parallel
+ let mut replay_tasks =
futures::stream::iter(replay_tasks).buffer_unordered(20);
+ while let Some((table_id, ret)) = replay_tasks.next().await {
+ if let Some(Err(e)) = ret {
// If occur error, mark this table as failed and store the
cause.
- if let Err(e) = result {
- failed_tables.insert(table_batch.table_id, e);
- }
+ failed_tables.insert(table_id, e);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]