This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/dev by this push:
     new 61b123ab refactor: avoid returning metrics in non-analyze sql (#1410)
61b123ab is described below

commit 61b123ab4006942bf4261e7880e1233d220f7deb
Author: 鲍金日 <[email protected]>
AuthorDate: Wed Jan 3 10:23:37 2024 +0800

    refactor: avoid returning metrics in non-analyze sql (#1410)
    
    ## Rationale
    In #1260, we implemented distributed analyze, but for query that are not
    analyze, metrics will be returned, which will lead to a decrease in
    query performance. Therefore, we will fix it in this PR, and metrics
    will not be returned for normal queries.
    
    ## Detailed Changes
    - Add is_analyze field to determine whether it is analyze
    
    ## Test Plan
    Existing tests
    
    ---------
    
    Co-authored-by: jiacai2050 <[email protected]>
---
 Cargo.lock                                         | 32 ++++++------
 Cargo.toml                                         |  2 +-
 df_engine_extensions/src/dist_sql_query/mod.rs     |  8 ++-
 .../src/dist_sql_query/physical_plan.rs            |  3 +-
 query_engine/src/datafusion_impl/task_context.rs   |  1 +
 server/src/grpc/remote_engine_service/mod.rs       | 59 ++++++++++++----------
 table_engine/src/remote/model.rs                   | 12 +++++
 7 files changed, 72 insertions(+), 45 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 27104165..b1f6b823 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -96,7 +96,7 @@ dependencies = [
  "atomic_enum",
  "base64 0.13.1",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "codec",
  "common_types",
  "datafusion",
@@ -1345,7 +1345,7 @@ dependencies = [
 [[package]]
 name = "ceresdbproto"
 version = "1.0.23"
-source = 
"git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4#d849fa44e29ea04c7d99c082a38efb8ce5200d5e";
+source = 
"git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc#cfdacccebb7c609cb1aac791b73ba9a838d7ade6";
 dependencies = [
  "prost",
  "protoc-bin-vendored",
@@ -1528,7 +1528,7 @@ dependencies = [
  "async-trait",
  "bytes_ext",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "common_types",
  "etcd-client",
  "future_ext",
@@ -1606,7 +1606,7 @@ dependencies = [
  "arrow 43.0.0",
  "arrow_ext",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "chrono",
  "datafusion",
  "hash_ext",
@@ -2362,7 +2362,7 @@ dependencies = [
  "async-recursion",
  "async-trait",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "common_types",
  "datafusion",
  "datafusion-proto",
@@ -3921,7 +3921,7 @@ name = "meta_client"
 version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "common_types",
  "futures 0.3.28",
  "generic_error",
@@ -4446,7 +4446,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
  "bytes",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "chrono",
  "clru",
  "crc",
@@ -5323,7 +5323,7 @@ dependencies = [
  "async-trait",
  "bytes",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "clru",
  "cluster",
  "common_types",
@@ -5451,7 +5451,7 @@ dependencies = [
  "arrow 43.0.0",
  "async-trait",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "chrono",
  "cluster",
  "codec",
@@ -5765,7 +5765,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "arrow_ext",
  "async-trait",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "common_types",
  "futures 0.3.28",
  "generic_error",
@@ -5894,7 +5894,7 @@ name = "router"
 version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "cluster",
  "common_types",
  "generic_error",
@@ -6269,7 +6269,7 @@ dependencies = [
  "async-trait",
  "bytes_ext",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "clru",
  "cluster",
  "common_types",
@@ -6795,7 +6795,7 @@ dependencies = [
  "async-trait",
  "bytes_ext",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "codec",
  "common_types",
  "futures 0.3.28",
@@ -6817,7 +6817,7 @@ dependencies = [
  "arrow_ext",
  "async-trait",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "common_types",
  "datafusion",
  "datafusion-proto",
@@ -7020,7 +7020,7 @@ dependencies = [
 name = "time_ext"
 version = "1.2.6-alpha"
 dependencies = [
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "chrono",
  "common_types",
  "macros",
@@ -7672,7 +7672,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
  "chrono",
  "codec",
  "common_types",
diff --git a/Cargo.toml b/Cargo.toml
index c99bbaf5..1588c237 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,7 +94,7 @@ bytes = "1"
 bytes_ext = { path = "components/bytes_ext" }
 catalog = { path = "catalog" }
 catalog_impls = { path = "catalog_impls" }
-ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git";, rev = 
"d849fa4" }
+ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git";, rev = 
"cfdaccc" }
 codec = { path = "components/codec" }
 chrono = "0.4"
 clap = "3.0"
diff --git a/df_engine_extensions/src/dist_sql_query/mod.rs 
b/df_engine_extensions/src/dist_sql_query/mod.rs
index abfc0cca..c2cd8250 100644
--- a/df_engine_extensions/src/dist_sql_query/mod.rs
+++ b/df_engine_extensions/src/dist_sql_query/mod.rs
@@ -66,13 +66,19 @@ type ExecutableScanBuilderRef = Box<dyn 
ExecutableScanBuilder>;
 pub struct RemoteTaskContext {
     pub task_ctx: Arc<TaskContext>,
     pub remote_metrics: Arc<Mutex<Option<String>>>,
+    pub is_analyze: bool,
 }
 
 impl RemoteTaskContext {
-    pub fn new(task_ctx: Arc<TaskContext>, remote_metrics: 
Arc<Mutex<Option<String>>>) -> Self {
+    pub fn new(
+        task_ctx: Arc<TaskContext>,
+        remote_metrics: Arc<Mutex<Option<String>>>,
+        is_analyze: bool,
+    ) -> Self {
         Self {
             task_ctx,
             remote_metrics,
+            is_analyze,
         }
     }
 }
diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs 
b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
index 87cd18bd..1ebe669f 100644
--- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs
+++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
@@ -349,7 +349,8 @@ impl ExecutionPlan for ResolvedPartitionedScan {
             remote_metrics,
         } = &self.remote_exec_ctx.plan_ctxs[partition];
 
-        let remote_task_ctx = RemoteTaskContext::new(context, 
remote_metrics.clone());
+        let remote_task_ctx =
+            RemoteTaskContext::new(context, remote_metrics.clone(), 
self.is_analyze);
 
         // Send plan for remote execution.
         let stream_future = self.remote_exec_ctx.executor.execute(
diff --git a/query_engine/src/datafusion_impl/task_context.rs 
b/query_engine/src/datafusion_impl/task_context.rs
index 8aefd563..5e34cdc7 100644
--- a/query_engine/src/datafusion_impl/task_context.rs
+++ b/query_engine/src/datafusion_impl/task_context.rs
@@ -202,6 +202,7 @@ impl RemotePhysicalPlanExecutor for 
RemotePhysicalPlanExecutorImpl {
             default_schema,
             query: display_plan.indent(true).to_string(),
             priority,
+            is_analyze: task_context.is_analyze,
         };
 
         // Encode plan and schema
diff --git a/server/src/grpc/remote_engine_service/mod.rs 
b/server/src/grpc/remote_engine_service/mod.rs
index eda9a61a..df201a12 100644
--- a/server/src/grpc/remote_engine_service/mod.rs
+++ b/server/src/grpc/remote_engine_service/mod.rs
@@ -220,17 +220,17 @@ impl<M: MetricCollector> Drop for StreamWithMetric<M> {
 
 struct RemoteExecStream {
     inner: BoxStream<'static, Result<RecordBatch>>,
-    physical_plan: Option<PhysicalPlanRef>,
+    physical_plan_for_explain: Option<PhysicalPlanRef>,
 }
 
 impl RemoteExecStream {
     fn new(
         inner: BoxStream<'static, Result<RecordBatch>>,
-        physical_plan: Option<PhysicalPlanRef>,
+        physical_plan_for_explain: Option<PhysicalPlanRef>,
     ) -> Self {
         Self {
             inner,
-            physical_plan,
+            physical_plan_for_explain,
         }
     }
 }
@@ -240,19 +240,25 @@ impl Stream for RemoteExecStream {
 
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
         let this = self.get_mut();
-        match this.inner.poll_next_unpin(cx) {
-            Poll::Ready(Some(res)) => {
-                Poll::Ready(Some(res.map(RecordBatchWithMetric::RecordBatch)))
-            }
-            Poll::Ready(None) => match &this.physical_plan {
-                Some(physical_plan) => {
-                    let metrics = physical_plan.metrics_to_string();
-                    this.physical_plan = None;
-                    
Poll::Ready(Some(Ok(RecordBatchWithMetric::Metric(metrics))))
+        let is_explain = this.physical_plan_for_explain.is_some();
+        loop {
+            match this.inner.poll_next_unpin(cx) {
+                Poll::Ready(Some(res)) => {
+                    // If the request is explain, we try drain the stream to 
get the metrics.
+                    if !is_explain {
+                        return 
Poll::Ready(Some(res.map(RecordBatchWithMetric::RecordBatch)));
+                    }
                 }
-                None => Poll::Ready(None),
-            },
-            Poll::Pending => Poll::Pending,
+                Poll::Ready(None) => match &this.physical_plan_for_explain {
+                    Some(physical_plan) => {
+                        let metrics = physical_plan.metrics_to_string();
+                        this.physical_plan_for_explain = None;
+                        return 
Poll::Ready(Some(Ok(RecordBatchWithMetric::Metric(metrics))));
+                    }
+                    None => return Poll::Ready(None),
+                },
+                Poll::Pending => return Poll::Pending,
+            }
         }
     }
 }
@@ -715,15 +721,16 @@ impl RemoteEngineServiceImpl {
             slow_threshold_secs,
             query_ctx.priority,
         );
-        let physical_plan = 
Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
-            encoded_plan,
-        )));
+        let physical_plan: PhysicalPlanRef = 
Arc::new(DataFusionPhysicalPlanAdapter::new(
+            TypedPlan::Remote(encoded_plan),
+        ));
+        // TODO: Use in handle_execute_plan fn to build stream with metrics
+        let physical_plan_for_explain = ctx.explain.map(|_| 
physical_plan.clone());
 
         let rt = self
             .runtimes
             .read_runtime
             .choose_runtime(&query_ctx.priority);
-        let physical_plan_clone = physical_plan.clone();
 
         let stream = rt
             .spawn(async move { handle_execute_plan(query_ctx, physical_plan, 
query_engine).await })
@@ -743,7 +750,7 @@ impl RemoteEngineServiceImpl {
         let stream = StreamWithMetric::new(Box::pin(stream), metric);
         Ok(RemoteExecStream::new(
             Box::pin(stream),
-            Some(physical_plan_clone),
+            physical_plan_for_explain,
         ))
     }
 
@@ -778,11 +785,11 @@ impl RemoteEngineServiceImpl {
             encoded_plan: encoded_plan.clone(),
         };
 
-        let physical_plan = 
Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
-            encoded_plan,
-        )));
-
-        let physical_plan_clone = physical_plan.clone();
+        let physical_plan: PhysicalPlanRef = 
Arc::new(DataFusionPhysicalPlanAdapter::new(
+            TypedPlan::Remote(encoded_plan),
+        ));
+        // TODO: Use in handle_execute_plan fn to build stream with metrics
+        let physical_plan_for_explain = ctx.explain.map(|_| 
physical_plan.clone());
 
         let QueryDedup {
             config,
@@ -822,7 +829,7 @@ impl RemoteEngineServiceImpl {
         let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)), 
metric);
         Ok(RemoteExecStream::new(
             Box::pin(stream),
-            Some(physical_plan_clone),
+            physical_plan_for_explain,
         ))
     }
 
diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs
index 9073e75e..deb28538 100644
--- a/table_engine/src/remote/model.rs
+++ b/table_engine/src/remote/model.rs
@@ -456,6 +456,9 @@ pub struct ExecContext {
     pub default_schema: String,
     pub query: String,
     pub priority: Priority,
+    // TOOO: there are many explain types, we need to support them all.
+    // A proper way is to define a enum for all explain types.
+    pub is_analyze: bool,
 }
 
 pub enum PhysicalPlan {
@@ -470,6 +473,11 @@ impl From<RemoteExecuteRequest> for 
ceresdbproto::remote_engine::ExecutePlanRequ
             NO_TIMEOUT
         };
 
+        let explain = if value.context.is_analyze {
+            Some(ceresdbproto::remote_engine::Explain::Analyze)
+        } else {
+            None
+        };
         let pb_context = ceresdbproto::remote_engine::ExecContext {
             request_id: 0, // not used any more
             request_id_str: value.context.request_id.to_string(),
@@ -478,6 +486,7 @@ impl From<RemoteExecuteRequest> for 
ceresdbproto::remote_engine::ExecutePlanRequ
             timeout_ms: rest_duration_ms,
             priority: value.context.priority.as_u8() as i32,
             displayable_query: value.context.query,
+            explain: explain.map(|v| v as i32),
         };
 
         let pb_plan = match value.physical_plan {
@@ -522,8 +531,10 @@ impl 
TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
             default_schema,
             timeout_ms,
             displayable_query,
+            explain,
             ..
         } = pb_exec_ctx;
+        let is_analyze = explain == 
Some(ceresdbproto::remote_engine::Explain::Analyze as i32);
 
         let request_id = RequestId::from(request_id_str);
         let deadline = if timeout_ms >= 0 {
@@ -539,6 +550,7 @@ impl 
TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
             default_schema,
             query: displayable_query,
             priority,
+            is_analyze,
         };
 
         // Plan


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to