This is an automated email from the ASF dual-hosted git repository. xikai pushed a commit to branch memtable-poc in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
commit 61ef0338f3a0f3aeb063acc570232a8dfb421189 Author: xikai.wxk <[email protected]> AuthorDate: Thu Dec 21 16:51:39 2023 +0800 feat: wait for write notification in io runtime --- analytic_engine/src/instance/mod.rs | 5 +++++ analytic_engine/src/table/mod.rs | 27 +++++++++++++-------------- table_engine/src/table.rs | 3 +++ 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 9caa365a..75408a86 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -297,6 +297,11 @@ impl Instance { &self.runtimes.write_runtime } + #[inline] + pub fn io_runtime(&self) -> &Arc<Runtime> { + &self.runtimes.io_runtime + } + #[inline] fn make_flusher(&self) -> Flusher { Flusher { diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 2947678a..cb476f57 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -52,9 +52,9 @@ use table_engine::{ stream::{PartitionedStreams, SendableRecordBatchStream}, table::{ AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get, - GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadRequest, - Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, WaitForPendingWrites, - Write, WriteRequest, + GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, Join, MergeWrite, ReadOptions, + ReadRequest, Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, + WaitForPendingWrites, Write, WriteRequest, }, ANALYTIC_ENGINE_TYPE, }; @@ -336,17 +336,16 @@ impl TableImpl { .metrics .start_table_write_queue_waiter_timer(); - // We have ever observed that `rx` is closed in production but it is impossible - // in theory(especially after warping actual write by - // `CancellationSafeFuture`). So we also warp `rx` by - // `CancellationSafeFuture` for not just retrying but better observing. - match CancellationSafeFuture::new( - rx, - "pending_queue_waiter", - self.instance.write_runtime().clone(), - ) - .await - { + let io_runtime = self.instance.io_runtime().clone(); + // We have ever observed that `rx` is closed in production but it is + // impossible in theory(especially after warping + // actual write by `CancellationSafeFuture`). So we + // also warp `rx` by `CancellationSafeFuture` for + // not just retrying but better observing. + let wait_notify = + CancellationSafeFuture::new(rx, "pending_queue_waiter", io_runtime); + let join_res = self.instance.io_runtime().spawn(wait_notify); + match join_res.await.context(Join)? { Ok(res) => { res.box_err().context(Write { table: self.name() })?; Ok(num_rows) diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 6c380441..a3793a6a 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -157,6 +157,9 @@ pub enum Error { #[snafu(display("Reject for too many pending writes, table:{table}"))] TooManyPendingWrites { table: String }, + #[snafu(display("Failed to join, err:{source}"))] + Join { source: runtime::Error }, + #[snafu(display("Failed to do merge write, msg:{}", msg))] MergeWrite { msg: String }, } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
