This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/dev by this push:
     new 904e2d5e feat: block rules support query (#1420)
904e2d5e is described below

commit 904e2d5ede4d1a68c9437b6143065f283f81fbd0
Author: Jiacai Liu <[email protected]>
AuthorDate: Thu Jan 4 14:42:18 2024 +0800

    feat: block rules support query (#1420)
    
    ## Rationale
    Query with long time range usually cost too much resources, which affect
    stable of the whole cluster
    
    ## Detailed Changes
    - Support block query by query range
    
    
    ## Test Plan
    Manually
    ```bash
    
    curl 0:5000/admin/block -H 'content-type: application/json' -d '
    {
      "operation": "Set",
      "write_block_list": [],
      "read_block_list": [],
      "block_rules": [
        {"type": "QueryRange", "content": "24h"}
      ]
    }'
    ```
---
 proxy/src/limiter.rs       | 44 +++++++++++++++++++++++++++++++++++++++++---
 proxy/src/metrics.rs       |  6 ++++++
 proxy/src/read.rs          |  2 +-
 query_frontend/src/plan.rs | 27 +++++++++++++++++++++++++++
 4 files changed, 75 insertions(+), 4 deletions(-)

diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs
index 9405dd9e..ea6b697d 100644
--- a/proxy/src/limiter.rs
+++ b/proxy/src/limiter.rs
@@ -12,13 +12,16 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use std::{collections::HashSet, sync::RwLock};
+use std::{collections::HashSet, str::FromStr, sync::RwLock};
 
 use datafusion::logical_expr::logical_plan::LogicalPlan;
 use macros::define_result;
 use query_frontend::plan::Plan;
 use serde::{Deserialize, Serialize};
 use snafu::Snafu;
+use time_ext::ReadableDuration;
+
+use crate::metrics::BLOCKED_REQUEST_COUNTER_VEC_GLOBAL;
 
 #[derive(Snafu, Debug)]
 #[snafu(visibility(pub))]
@@ -33,12 +36,26 @@ pub enum Error {
 define_result!(Error);
 
 #[derive(Clone, Copy, Deserialize, Debug, PartialEq, Eq, Hash, Serialize, 
PartialOrd, Ord)]
+#[serde(tag = "type", content = "content")]
 pub enum BlockRule {
     QueryWithoutPredicate,
+    /// Max time range a query can scan.
+    #[serde(deserialize_with = "deserialize_readable_duration")]
+    QueryRange(i64),
     AnyQuery,
     AnyInsert,
 }
 
+fn deserialize_readable_duration<'de, D>(deserializer: D) -> 
std::result::Result<i64, D::Error>
+where
+    D: serde::Deserializer<'de>,
+{
+    let s: &str = Deserialize::deserialize(deserializer)?;
+    ReadableDuration::from_str(s)
+        .map(|d| d.0.as_millis() as i64)
+        .map_err(serde::de::Error::custom)
+}
+
 #[derive(Default, Clone, Deserialize, Debug, Serialize)]
 #[serde(default)]
 pub struct LimiterConfig {
@@ -52,6 +69,17 @@ impl BlockRule {
         match self {
             BlockRule::QueryWithoutPredicate => 
self.is_query_without_predicate(plan),
             BlockRule::AnyQuery => matches!(plan, Plan::Query(_)),
+            BlockRule::QueryRange(threshold) => {
+                if let Plan::Query(plan) = plan {
+                    if let Some(range) = plan.query_range() {
+                        if range > *threshold {
+                            return true;
+                        }
+                    }
+                }
+
+                false
+            }
             BlockRule::AnyInsert => matches!(plan, Plan::Insert(_)),
         }
     }
@@ -159,8 +187,18 @@ impl Limiter {
     ///
     /// Error will throws if the plan is forbidden to execute.
     pub fn try_limit(&self, plan: &Plan) -> Result<()> {
-        self.try_limit_by_block_list(plan)?;
-        self.try_limit_by_rules(plan)
+        let result = {
+            self.try_limit_by_block_list(plan)?;
+            self.try_limit_by_rules(plan)
+        };
+
+        if result.is_err() {
+            BLOCKED_REQUEST_COUNTER_VEC_GLOBAL
+                .with_label_values(&[plan.plan_type()])
+                .inc();
+        }
+
+        result
     }
 
     pub fn add_write_block_list(&self, block_list: Vec<String>) {
diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs
index c55c47c2..3478b6bd 100644
--- a/proxy/src/metrics.rs
+++ b/proxy/src/metrics.rs
@@ -61,6 +61,12 @@ lazy_static! {
     pub static ref HTTP_HANDLER_COUNTER_VEC_GLOBAL: IntCounterVec =
         register_int_counter_vec!("http_handler_counter", "Http handler 
counter", &["type"])
             .unwrap();
+    pub static ref BLOCKED_REQUEST_COUNTER_VEC_GLOBAL: IntCounterVec = 
register_int_counter_vec!(
+        "blocked_request_counter",
+        "Blocked request counter",
+        &["type"]
+    )
+    .unwrap();
 }
 
 lazy_static! {
diff --git a/proxy/src/read.rs b/proxy/src/read.rs
index effb88c0..e821750c 100644
--- a/proxy/src/read.rs
+++ b/proxy/src/read.rs
@@ -240,7 +240,7 @@ impl Proxy {
                 .try_limit(&plan)
                 .box_err()
                 .context(Internal {
-                    msg: "Request is blocked",
+                    msg: format!("Request is blocked, 
table_name:{table_name:?}"),
                 })?;
         }
 
diff --git a/query_frontend/src/plan.rs b/query_frontend/src/plan.rs
index 94c8bca6..597c6810 100644
--- a/query_frontend/src/plan.rs
+++ b/query_frontend/src/plan.rs
@@ -77,6 +77,21 @@ pub enum Plan {
     Exists(ExistsTablePlan),
 }
 
+impl Plan {
+    pub fn plan_type(&self) -> &str {
+        match self {
+            Self::Query(_) => "query",
+            Self::Insert(_) => "insert",
+            Self::Create(_)
+            | Self::Drop(_)
+            | Self::Describe(_)
+            | Self::AlterTable(_)
+            | Self::Show(_)
+            | Self::Exists(_) => "other",
+        }
+    }
+}
+
 pub struct PriorityContext {
     pub time_range_threshold: u64,
 }
@@ -201,6 +216,18 @@ impl QueryPlan {
 
         Some(priority)
     }
+
+    /// When query contains invalid time range such as `[200, 100]`, it will
+    /// return None.
+    pub fn query_range(&self) -> Option<i64> {
+        self.extract_time_range().map(|time_range| {
+            time_range
+                .exclusive_end()
+                .as_i64()
+                .checked_sub(time_range.inclusive_start().as_i64())
+                .unwrap_or(i64::MAX)
+        })
+    }
 }
 
 impl Debug for QueryPlan {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to