This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 66d7a0d3 feat: replay logs of different tables in parallel (#1492)
66d7a0d3 is described below

commit 66d7a0d3d1a59743ccefbb248165c28b78cdf0bb
Author: Lethannn <[email protected]>
AuthorDate: Mon Mar 11 11:43:28 2024 +0800

    feat: replay logs of different tables in parallel (#1492)
    
    ## Rationale
    
    Related with https://github.com/apache/incubator-horaedb/issues/1466
    
    ## Detailed Changes
    Replay logs of different tables in parallel
    
    ## Test Plan
    CI
    
    ---------
    
    Co-authored-by: jiacai2050 <[email protected]>
---
 Cargo.lock                                       | 35 ++++++++++------
 src/analytic_engine/src/instance/wal_replayer.rs | 53 ++++++++++++++----------
 2 files changed, 52 insertions(+), 36 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index d7fad18c..fca1cab2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1972,15 +1972,15 @@ dependencies = [
 
 [[package]]
 name = "dashmap"
-version = "5.4.0"
+version = "5.5.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
+checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
 dependencies = [
  "cfg-if 1.0.0",
- "hashbrown 0.12.3",
+ "hashbrown 0.14.0",
  "lock_api",
  "once_cell",
- "parking_lot_core 0.9.7",
+ "parking_lot_core 0.9.9",
 ]
 
 [[package]]
@@ -3750,9 +3750,9 @@ dependencies = [
 
 [[package]]
 name = "lock_api"
-version = "0.4.9"
+version = "0.4.11"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
+checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45"
 dependencies = [
  "autocfg",
  "scopeguard",
@@ -4551,7 +4551,7 @@ version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
 dependencies = [
- "parking_lot_core 0.9.7",
+ "parking_lot_core 0.9.9",
 ]
 
 [[package]]
@@ -4626,7 +4626,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
 dependencies = [
  "lock_api",
- "parking_lot_core 0.9.7",
+ "parking_lot_core 0.9.9",
 ]
 
 [[package]]
@@ -4645,15 +4645,15 @@ dependencies = [
 
 [[package]]
 name = "parking_lot_core"
-version = "0.9.7"
+version = "0.9.9"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
+checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
 dependencies = [
  "cfg-if 1.0.0",
  "libc",
- "redox_syscall 0.2.16",
+ "redox_syscall 0.4.1",
  "smallvec",
- "windows-sys 0.45.0",
+ "windows-targets 0.48.1",
 ]
 
 [[package]]
@@ -5724,6 +5724,15 @@ dependencies = [
  "bitflags 1.3.2",
 ]
 
+[[package]]
+name = "redox_syscall"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
+dependencies = [
+ "bitflags 1.3.2",
+]
+
 [[package]]
 name = "redox_users"
 version = "0.4.3"
@@ -8384,7 +8393,7 @@ dependencies = [
  "fail",
  "lazy_static",
  "num_cpus",
- "parking_lot_core 0.9.7",
+ "parking_lot_core 0.9.9",
  "prometheus 0.13.3",
  "rand 0.8.5",
 ]
diff --git a/src/analytic_engine/src/instance/wal_replayer.rs 
b/src/analytic_engine/src/instance/wal_replayer.rs
index 0aea4f2e..78792541 100644
--- a/src/analytic_engine/src/instance/wal_replayer.rs
+++ b/src/analytic_engine/src/instance/wal_replayer.rs
@@ -29,13 +29,14 @@ use common_types::{
     schema::{IndexInWriterSchema, Schema},
     table::ShardId,
 };
+use futures::StreamExt;
 use generic_error::BoxError;
 use lazy_static::lazy_static;
 use logger::{debug, error, info, trace, warn};
 use prometheus::{exponential_buckets, register_histogram, Histogram};
 use snafu::ResultExt;
 use table_engine::table::TableId;
-use tokio::sync::MutexGuard;
+use tokio::sync::{Mutex, MutexGuard};
 use wal::{
     log_batch::LogEntry,
     manager::{
@@ -335,6 +336,7 @@ impl RegionBasedReplay {
         let schema_provider = TableSchemaProviderAdapter {
             table_datas: table_datas_by_id.clone(),
         };
+        let serial_exec_ctxs = Arc::new(Mutex::new(serial_exec_ctxs));
         // Split and replay logs.
         loop {
             let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
@@ -352,13 +354,8 @@ impl RegionBasedReplay {
             }
 
             let _timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
-            Self::replay_single_batch(
-                context,
-                &log_entry_buf,
-                &mut serial_exec_ctxs,
-                failed_tables,
-            )
-            .await?;
+            Self::replay_single_batch(context, &log_entry_buf, 
&serial_exec_ctxs, failed_tables)
+                .await?;
         }
 
         Ok(())
@@ -367,7 +364,7 @@ impl RegionBasedReplay {
     async fn replay_single_batch(
         context: &ReplayContext,
         log_batch: &VecDeque<LogEntry<ReadPayload>>,
-        serial_exec_ctxs: &mut HashMap<TableId, SerialExecContext<'_>>,
+        serial_exec_ctxs: &Arc<Mutex<HashMap<TableId, SerialExecContext<'_>>>>,
         failed_tables: &mut FailedTables,
     ) -> Result<()> {
         let mut table_batches = Vec::new();
@@ -375,28 +372,38 @@ impl RegionBasedReplay {
         Self::split_log_batch_by_table(log_batch, &mut table_batches);
 
         // TODO: Replay logs of different tables in parallel.
+        let mut replay_tasks = Vec::with_capacity(table_batches.len());
         for table_batch in table_batches {
             // Some tables may have failed in previous replay, ignore them.
             if failed_tables.contains_key(&table_batch.table_id) {
                 continue;
             }
 
-            // Replay all log entries of current table.
-            // Some tables may have been moved to other shards or dropped, 
ignore such logs.
-            if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) 
{
-                let result = replay_table_log_entries(
-                    &context.flusher,
-                    context.max_retry_flush_limit,
-                    &mut ctx.serial_exec,
-                    &ctx.table_data,
-                    log_batch.range(table_batch.range),
-                )
-                .await;
+            let serial_exec_ctxs = serial_exec_ctxs.clone();
+            replay_tasks.push(async move {
+                // Some tables may have been moved to other shards or dropped, 
ignore such logs.
+                if let Some(ctx) = 
serial_exec_ctxs.lock().await.get_mut(&table_batch.table_id) {
+                    let result = replay_table_log_entries(
+                        &context.flusher,
+                        context.max_retry_flush_limit,
+                        &mut ctx.serial_exec,
+                        &ctx.table_data,
+                        log_batch.range(table_batch.range),
+                    )
+                    .await;
+                    (table_batch.table_id, Some(result))
+                } else {
+                    (table_batch.table_id, None)
+                }
+            });
+        }
 
+        // Run at most 20 tasks in parallel
+        let mut replay_tasks = 
futures::stream::iter(replay_tasks).buffer_unordered(20);
+        while let Some((table_id, ret)) = replay_tasks.next().await {
+            if let Some(Err(e)) = ret {
                 // If occur error, mark this table as failed and store the 
cause.
-                if let Err(e) = result {
-                    failed_tables.insert(table_batch.table_id, e);
-                }
+                failed_tables.insert(table_id, e);
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to