This is an automated email from the ASF dual-hosted git repository.

xikai pushed a commit to branch memtable-poc
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git

commit 61ef0338f3a0f3aeb063acc570232a8dfb421189
Author: xikai.wxk <[email protected]>
AuthorDate: Thu Dec 21 16:51:39 2023 +0800

    feat: wait for write notification in io runtime
---
 analytic_engine/src/instance/mod.rs |  5 +++++
 analytic_engine/src/table/mod.rs    | 27 +++++++++++++--------------
 table_engine/src/table.rs           |  3 +++
 3 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/analytic_engine/src/instance/mod.rs 
b/analytic_engine/src/instance/mod.rs
index 9caa365a..75408a86 100644
--- a/analytic_engine/src/instance/mod.rs
+++ b/analytic_engine/src/instance/mod.rs
@@ -297,6 +297,11 @@ impl Instance {
         &self.runtimes.write_runtime
     }
 
+    #[inline]
+    pub fn io_runtime(&self) -> &Arc<Runtime> {
+        &self.runtimes.io_runtime
+    }
+
     #[inline]
     fn make_flusher(&self) -> Flusher {
         Flusher {
diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs
index 2947678a..cb476f57 100644
--- a/analytic_engine/src/table/mod.rs
+++ b/analytic_engine/src/table/mod.rs
@@ -52,9 +52,9 @@ use table_engine::{
     stream::{PartitionedStreams, SendableRecordBatchStream},
     table::{
         AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, 
FlushRequest, Get,
-        GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, 
ReadOptions, ReadRequest,
-        Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, 
WaitForPendingWrites,
-        Write, WriteRequest,
+        GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, Join, MergeWrite, 
ReadOptions,
+        ReadRequest, Result, Scan, Table, TableId, TableStats, 
TooManyPendingWrites,
+        WaitForPendingWrites, Write, WriteRequest,
     },
     ANALYTIC_ENGINE_TYPE,
 };
@@ -336,17 +336,16 @@ impl TableImpl {
                     .metrics
                     .start_table_write_queue_waiter_timer();
 
-                // We have ever observed that `rx` is closed in production but 
it is impossible
-                // in theory(especially after warping actual write by
-                // `CancellationSafeFuture`). So we also warp `rx` by
-                // `CancellationSafeFuture` for not just retrying but better 
observing.
-                match CancellationSafeFuture::new(
-                    rx,
-                    "pending_queue_waiter",
-                    self.instance.write_runtime().clone(),
-                )
-                .await
-                {
+                let io_runtime = self.instance.io_runtime().clone();
+                // We have ever observed that `rx` is closed in production but 
it is
+                // impossible in theory(especially after warping
+                // actual write by `CancellationSafeFuture`). So we
+                // also warp `rx` by `CancellationSafeFuture` for
+                // not just retrying but better observing.
+                let wait_notify =
+                    CancellationSafeFuture::new(rx, "pending_queue_waiter", 
io_runtime);
+                let join_res = self.instance.io_runtime().spawn(wait_notify);
+                match join_res.await.context(Join)? {
                     Ok(res) => {
                         res.box_err().context(Write { table: self.name() })?;
                         Ok(num_rows)
diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs
index 6c380441..a3793a6a 100644
--- a/table_engine/src/table.rs
+++ b/table_engine/src/table.rs
@@ -157,6 +157,9 @@ pub enum Error {
     #[snafu(display("Reject for too many pending writes, table:{table}"))]
     TooManyPendingWrites { table: String },
 
+    #[snafu(display("Failed to join, err:{source}"))]
+    Join { source: runtime::Error },
+
     #[snafu(display("Failed to do merge write, msg:{}", msg))]
     MergeWrite { msg: String },
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to