This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/dev by this push:
new 61b123ab refactor: avoid returning metrics in non-analyze sql (#1410)
61b123ab is described below
commit 61b123ab4006942bf4261e7880e1233d220f7deb
Author: 鲍金日 <[email protected]>
AuthorDate: Wed Jan 3 10:23:37 2024 +0800
refactor: avoid returning metrics in non-analyze sql (#1410)
## Rationale
In #1260, we implemented distributed analyze, but for query that are not
analyze, metrics will be returned, which will lead to a decrease in
query performance. Therefore, we will fix it in this PR, and metrics
will not be returned for normal queries.
## Detailed Changes
- Add is_analyze field to determine whether it is analyze
## Test Plan
Existing tests
---------
Co-authored-by: jiacai2050 <[email protected]>
---
Cargo.lock | 32 ++++++------
Cargo.toml | 2 +-
df_engine_extensions/src/dist_sql_query/mod.rs | 8 ++-
.../src/dist_sql_query/physical_plan.rs | 3 +-
query_engine/src/datafusion_impl/task_context.rs | 1 +
server/src/grpc/remote_engine_service/mod.rs | 59 ++++++++++++----------
table_engine/src/remote/model.rs | 12 +++++
7 files changed, 72 insertions(+), 45 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 27104165..b1f6b823 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -96,7 +96,7 @@ dependencies = [
"atomic_enum",
"base64 0.13.1",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"codec",
"common_types",
"datafusion",
@@ -1345,7 +1345,7 @@ dependencies = [
[[package]]
name = "ceresdbproto"
version = "1.0.23"
-source =
"git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4#d849fa44e29ea04c7d99c082a38efb8ce5200d5e"
+source =
"git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc#cfdacccebb7c609cb1aac791b73ba9a838d7ade6"
dependencies = [
"prost",
"protoc-bin-vendored",
@@ -1528,7 +1528,7 @@ dependencies = [
"async-trait",
"bytes_ext",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"common_types",
"etcd-client",
"future_ext",
@@ -1606,7 +1606,7 @@ dependencies = [
"arrow 43.0.0",
"arrow_ext",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"chrono",
"datafusion",
"hash_ext",
@@ -2362,7 +2362,7 @@ dependencies = [
"async-recursion",
"async-trait",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"common_types",
"datafusion",
"datafusion-proto",
@@ -3921,7 +3921,7 @@ name = "meta_client"
version = "1.2.6-alpha"
dependencies = [
"async-trait",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"common_types",
"futures 0.3.28",
"generic_error",
@@ -4446,7 +4446,7 @@ version = "1.2.6-alpha"
dependencies = [
"async-trait",
"bytes",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"chrono",
"clru",
"crc",
@@ -5323,7 +5323,7 @@ dependencies = [
"async-trait",
"bytes",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"clru",
"cluster",
"common_types",
@@ -5451,7 +5451,7 @@ dependencies = [
"arrow 43.0.0",
"async-trait",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"chrono",
"cluster",
"codec",
@@ -5765,7 +5765,7 @@ version = "1.2.6-alpha"
dependencies = [
"arrow_ext",
"async-trait",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"common_types",
"futures 0.3.28",
"generic_error",
@@ -5894,7 +5894,7 @@ name = "router"
version = "1.2.6-alpha"
dependencies = [
"async-trait",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"cluster",
"common_types",
"generic_error",
@@ -6269,7 +6269,7 @@ dependencies = [
"async-trait",
"bytes_ext",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"clru",
"cluster",
"common_types",
@@ -6795,7 +6795,7 @@ dependencies = [
"async-trait",
"bytes_ext",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"codec",
"common_types",
"futures 0.3.28",
@@ -6817,7 +6817,7 @@ dependencies = [
"arrow_ext",
"async-trait",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"common_types",
"datafusion",
"datafusion-proto",
@@ -7020,7 +7020,7 @@ dependencies = [
name = "time_ext"
version = "1.2.6-alpha"
dependencies = [
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"chrono",
"common_types",
"macros",
@@ -7672,7 +7672,7 @@ version = "1.2.6-alpha"
dependencies = [
"async-trait",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
"chrono",
"codec",
"common_types",
diff --git a/Cargo.toml b/Cargo.toml
index c99bbaf5..1588c237 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,7 +94,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
-ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev =
"d849fa4" }
+ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev =
"cfdaccc" }
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
diff --git a/df_engine_extensions/src/dist_sql_query/mod.rs
b/df_engine_extensions/src/dist_sql_query/mod.rs
index abfc0cca..c2cd8250 100644
--- a/df_engine_extensions/src/dist_sql_query/mod.rs
+++ b/df_engine_extensions/src/dist_sql_query/mod.rs
@@ -66,13 +66,19 @@ type ExecutableScanBuilderRef = Box<dyn
ExecutableScanBuilder>;
pub struct RemoteTaskContext {
pub task_ctx: Arc<TaskContext>,
pub remote_metrics: Arc<Mutex<Option<String>>>,
+ pub is_analyze: bool,
}
impl RemoteTaskContext {
- pub fn new(task_ctx: Arc<TaskContext>, remote_metrics:
Arc<Mutex<Option<String>>>) -> Self {
+ pub fn new(
+ task_ctx: Arc<TaskContext>,
+ remote_metrics: Arc<Mutex<Option<String>>>,
+ is_analyze: bool,
+ ) -> Self {
Self {
task_ctx,
remote_metrics,
+ is_analyze,
}
}
}
diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs
b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
index 87cd18bd..1ebe669f 100644
--- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs
+++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
@@ -349,7 +349,8 @@ impl ExecutionPlan for ResolvedPartitionedScan {
remote_metrics,
} = &self.remote_exec_ctx.plan_ctxs[partition];
- let remote_task_ctx = RemoteTaskContext::new(context,
remote_metrics.clone());
+ let remote_task_ctx =
+ RemoteTaskContext::new(context, remote_metrics.clone(),
self.is_analyze);
// Send plan for remote execution.
let stream_future = self.remote_exec_ctx.executor.execute(
diff --git a/query_engine/src/datafusion_impl/task_context.rs
b/query_engine/src/datafusion_impl/task_context.rs
index 8aefd563..5e34cdc7 100644
--- a/query_engine/src/datafusion_impl/task_context.rs
+++ b/query_engine/src/datafusion_impl/task_context.rs
@@ -202,6 +202,7 @@ impl RemotePhysicalPlanExecutor for
RemotePhysicalPlanExecutorImpl {
default_schema,
query: display_plan.indent(true).to_string(),
priority,
+ is_analyze: task_context.is_analyze,
};
// Encode plan and schema
diff --git a/server/src/grpc/remote_engine_service/mod.rs
b/server/src/grpc/remote_engine_service/mod.rs
index eda9a61a..df201a12 100644
--- a/server/src/grpc/remote_engine_service/mod.rs
+++ b/server/src/grpc/remote_engine_service/mod.rs
@@ -220,17 +220,17 @@ impl<M: MetricCollector> Drop for StreamWithMetric<M> {
struct RemoteExecStream {
inner: BoxStream<'static, Result<RecordBatch>>,
- physical_plan: Option<PhysicalPlanRef>,
+ physical_plan_for_explain: Option<PhysicalPlanRef>,
}
impl RemoteExecStream {
fn new(
inner: BoxStream<'static, Result<RecordBatch>>,
- physical_plan: Option<PhysicalPlanRef>,
+ physical_plan_for_explain: Option<PhysicalPlanRef>,
) -> Self {
Self {
inner,
- physical_plan,
+ physical_plan_for_explain,
}
}
}
@@ -240,19 +240,25 @@ impl Stream for RemoteExecStream {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
let this = self.get_mut();
- match this.inner.poll_next_unpin(cx) {
- Poll::Ready(Some(res)) => {
- Poll::Ready(Some(res.map(RecordBatchWithMetric::RecordBatch)))
- }
- Poll::Ready(None) => match &this.physical_plan {
- Some(physical_plan) => {
- let metrics = physical_plan.metrics_to_string();
- this.physical_plan = None;
-
Poll::Ready(Some(Ok(RecordBatchWithMetric::Metric(metrics))))
+ let is_explain = this.physical_plan_for_explain.is_some();
+ loop {
+ match this.inner.poll_next_unpin(cx) {
+ Poll::Ready(Some(res)) => {
+ // If the request is explain, we try drain the stream to
get the metrics.
+ if !is_explain {
+ return
Poll::Ready(Some(res.map(RecordBatchWithMetric::RecordBatch)));
+ }
}
- None => Poll::Ready(None),
- },
- Poll::Pending => Poll::Pending,
+ Poll::Ready(None) => match &this.physical_plan_for_explain {
+ Some(physical_plan) => {
+ let metrics = physical_plan.metrics_to_string();
+ this.physical_plan_for_explain = None;
+ return
Poll::Ready(Some(Ok(RecordBatchWithMetric::Metric(metrics))));
+ }
+ None => return Poll::Ready(None),
+ },
+ Poll::Pending => return Poll::Pending,
+ }
}
}
}
@@ -715,15 +721,16 @@ impl RemoteEngineServiceImpl {
slow_threshold_secs,
query_ctx.priority,
);
- let physical_plan =
Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
- encoded_plan,
- )));
+ let physical_plan: PhysicalPlanRef =
Arc::new(DataFusionPhysicalPlanAdapter::new(
+ TypedPlan::Remote(encoded_plan),
+ ));
+ // TODO: Use in handle_execute_plan fn to build stream with metrics
+ let physical_plan_for_explain = ctx.explain.map(|_|
physical_plan.clone());
let rt = self
.runtimes
.read_runtime
.choose_runtime(&query_ctx.priority);
- let physical_plan_clone = physical_plan.clone();
let stream = rt
.spawn(async move { handle_execute_plan(query_ctx, physical_plan,
query_engine).await })
@@ -743,7 +750,7 @@ impl RemoteEngineServiceImpl {
let stream = StreamWithMetric::new(Box::pin(stream), metric);
Ok(RemoteExecStream::new(
Box::pin(stream),
- Some(physical_plan_clone),
+ physical_plan_for_explain,
))
}
@@ -778,11 +785,11 @@ impl RemoteEngineServiceImpl {
encoded_plan: encoded_plan.clone(),
};
- let physical_plan =
Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
- encoded_plan,
- )));
-
- let physical_plan_clone = physical_plan.clone();
+ let physical_plan: PhysicalPlanRef =
Arc::new(DataFusionPhysicalPlanAdapter::new(
+ TypedPlan::Remote(encoded_plan),
+ ));
+ // TODO: Use in handle_execute_plan fn to build stream with metrics
+ let physical_plan_for_explain = ctx.explain.map(|_|
physical_plan.clone());
let QueryDedup {
config,
@@ -822,7 +829,7 @@ impl RemoteEngineServiceImpl {
let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)),
metric);
Ok(RemoteExecStream::new(
Box::pin(stream),
- Some(physical_plan_clone),
+ physical_plan_for_explain,
))
}
diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs
index 9073e75e..deb28538 100644
--- a/table_engine/src/remote/model.rs
+++ b/table_engine/src/remote/model.rs
@@ -456,6 +456,9 @@ pub struct ExecContext {
pub default_schema: String,
pub query: String,
pub priority: Priority,
+ // TOOO: there are many explain types, we need to support them all.
+ // A proper way is to define a enum for all explain types.
+ pub is_analyze: bool,
}
pub enum PhysicalPlan {
@@ -470,6 +473,11 @@ impl From<RemoteExecuteRequest> for
ceresdbproto::remote_engine::ExecutePlanRequ
NO_TIMEOUT
};
+ let explain = if value.context.is_analyze {
+ Some(ceresdbproto::remote_engine::Explain::Analyze)
+ } else {
+ None
+ };
let pb_context = ceresdbproto::remote_engine::ExecContext {
request_id: 0, // not used any more
request_id_str: value.context.request_id.to_string(),
@@ -478,6 +486,7 @@ impl From<RemoteExecuteRequest> for
ceresdbproto::remote_engine::ExecutePlanRequ
timeout_ms: rest_duration_ms,
priority: value.context.priority.as_u8() as i32,
displayable_query: value.context.query,
+ explain: explain.map(|v| v as i32),
};
let pb_plan = match value.physical_plan {
@@ -522,8 +531,10 @@ impl
TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
default_schema,
timeout_ms,
displayable_query,
+ explain,
..
} = pb_exec_ctx;
+ let is_analyze = explain ==
Some(ceresdbproto::remote_engine::Explain::Analyze as i32);
let request_id = RequestId::from(request_id_str);
let deadline = if timeout_ms >= 0 {
@@ -539,6 +550,7 @@ impl
TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
default_schema,
query: displayable_query,
priority,
+ is_analyze,
};
// Plan
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]