This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/dev by this push:
new 904e2d5e feat: block rules support query (#1420)
904e2d5e is described below
commit 904e2d5ede4d1a68c9437b6143065f283f81fbd0
Author: Jiacai Liu <[email protected]>
AuthorDate: Thu Jan 4 14:42:18 2024 +0800
feat: block rules support query (#1420)
## Rationale
Query with long time range usually cost too much resources, which affect
stable of the whole cluster
## Detailed Changes
- Support block query by query range
## Test Plan
Manually
```bash
curl 0:5000/admin/block -H 'content-type: application/json' -d '
{
"operation": "Set",
"write_block_list": [],
"read_block_list": [],
"block_rules": [
{"type": "QueryRange", "content": "24h"}
]
}'
```
---
proxy/src/limiter.rs | 44 +++++++++++++++++++++++++++++++++++++++++---
proxy/src/metrics.rs | 6 ++++++
proxy/src/read.rs | 2 +-
query_frontend/src/plan.rs | 27 +++++++++++++++++++++++++++
4 files changed, 75 insertions(+), 4 deletions(-)
diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs
index 9405dd9e..ea6b697d 100644
--- a/proxy/src/limiter.rs
+++ b/proxy/src/limiter.rs
@@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::{collections::HashSet, sync::RwLock};
+use std::{collections::HashSet, str::FromStr, sync::RwLock};
use datafusion::logical_expr::logical_plan::LogicalPlan;
use macros::define_result;
use query_frontend::plan::Plan;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
+use time_ext::ReadableDuration;
+
+use crate::metrics::BLOCKED_REQUEST_COUNTER_VEC_GLOBAL;
#[derive(Snafu, Debug)]
#[snafu(visibility(pub))]
@@ -33,12 +36,26 @@ pub enum Error {
define_result!(Error);
#[derive(Clone, Copy, Deserialize, Debug, PartialEq, Eq, Hash, Serialize,
PartialOrd, Ord)]
+#[serde(tag = "type", content = "content")]
pub enum BlockRule {
QueryWithoutPredicate,
+ /// Max time range a query can scan.
+ #[serde(deserialize_with = "deserialize_readable_duration")]
+ QueryRange(i64),
AnyQuery,
AnyInsert,
}
+fn deserialize_readable_duration<'de, D>(deserializer: D) ->
std::result::Result<i64, D::Error>
+where
+ D: serde::Deserializer<'de>,
+{
+ let s: &str = Deserialize::deserialize(deserializer)?;
+ ReadableDuration::from_str(s)
+ .map(|d| d.0.as_millis() as i64)
+ .map_err(serde::de::Error::custom)
+}
+
#[derive(Default, Clone, Deserialize, Debug, Serialize)]
#[serde(default)]
pub struct LimiterConfig {
@@ -52,6 +69,17 @@ impl BlockRule {
match self {
BlockRule::QueryWithoutPredicate =>
self.is_query_without_predicate(plan),
BlockRule::AnyQuery => matches!(plan, Plan::Query(_)),
+ BlockRule::QueryRange(threshold) => {
+ if let Plan::Query(plan) = plan {
+ if let Some(range) = plan.query_range() {
+ if range > *threshold {
+ return true;
+ }
+ }
+ }
+
+ false
+ }
BlockRule::AnyInsert => matches!(plan, Plan::Insert(_)),
}
}
@@ -159,8 +187,18 @@ impl Limiter {
///
/// Error will throws if the plan is forbidden to execute.
pub fn try_limit(&self, plan: &Plan) -> Result<()> {
- self.try_limit_by_block_list(plan)?;
- self.try_limit_by_rules(plan)
+ let result = {
+ self.try_limit_by_block_list(plan)?;
+ self.try_limit_by_rules(plan)
+ };
+
+ if result.is_err() {
+ BLOCKED_REQUEST_COUNTER_VEC_GLOBAL
+ .with_label_values(&[plan.plan_type()])
+ .inc();
+ }
+
+ result
}
pub fn add_write_block_list(&self, block_list: Vec<String>) {
diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs
index c55c47c2..3478b6bd 100644
--- a/proxy/src/metrics.rs
+++ b/proxy/src/metrics.rs
@@ -61,6 +61,12 @@ lazy_static! {
pub static ref HTTP_HANDLER_COUNTER_VEC_GLOBAL: IntCounterVec =
register_int_counter_vec!("http_handler_counter", "Http handler
counter", &["type"])
.unwrap();
+ pub static ref BLOCKED_REQUEST_COUNTER_VEC_GLOBAL: IntCounterVec =
register_int_counter_vec!(
+ "blocked_request_counter",
+ "Blocked request counter",
+ &["type"]
+ )
+ .unwrap();
}
lazy_static! {
diff --git a/proxy/src/read.rs b/proxy/src/read.rs
index effb88c0..e821750c 100644
--- a/proxy/src/read.rs
+++ b/proxy/src/read.rs
@@ -240,7 +240,7 @@ impl Proxy {
.try_limit(&plan)
.box_err()
.context(Internal {
- msg: "Request is blocked",
+ msg: format!("Request is blocked,
table_name:{table_name:?}"),
})?;
}
diff --git a/query_frontend/src/plan.rs b/query_frontend/src/plan.rs
index 94c8bca6..597c6810 100644
--- a/query_frontend/src/plan.rs
+++ b/query_frontend/src/plan.rs
@@ -77,6 +77,21 @@ pub enum Plan {
Exists(ExistsTablePlan),
}
+impl Plan {
+ pub fn plan_type(&self) -> &str {
+ match self {
+ Self::Query(_) => "query",
+ Self::Insert(_) => "insert",
+ Self::Create(_)
+ | Self::Drop(_)
+ | Self::Describe(_)
+ | Self::AlterTable(_)
+ | Self::Show(_)
+ | Self::Exists(_) => "other",
+ }
+ }
+}
+
pub struct PriorityContext {
pub time_range_threshold: u64,
}
@@ -201,6 +216,18 @@ impl QueryPlan {
Some(priority)
}
+
+ /// When query contains invalid time range such as `[200, 100]`, it will
+ /// return None.
+ pub fn query_range(&self) -> Option<i64> {
+ self.extract_time_range().map(|time_range| {
+ time_range
+ .exclusive_end()
+ .as_i64()
+ .checked_sub(time_range.inclusive_start().as_i64())
+ .unwrap_or(i64::MAX)
+ })
+ }
}
impl Debug for QueryPlan {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]