This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new a1db8828 feat: implement opentsdb query (#1453)
a1db8828 is described below

commit a1db882892013448c2ba93f6bee620aa2b021201
Author: 鲍金日 <[email protected]>
AuthorDate: Fri Mar 29 18:45:33 2024 +0800

    feat: implement opentsdb query (#1453)
    
    ## Rationale
    Opentsdb write protocol is already supported, this PR implement query
    protocol.
    
    ## Detailed Changes
    - Convert opentsdb query requests into datafusion logical plans
    - Convert the RecordBatch format of the query results into the return
    response format of the opentsdb query requests
    
    ## Test Plan
    - Existing tests
    - add new unit tests and integration
    
    ---------
    
    Co-authored-by: jiacai2050 <[email protected]>
---
 Cargo.lock                                         |   2 +
 .../cases/env/local/opentsdb/basic.result          | 106 ++++++
 .../cases/env/local/opentsdb/basic.sql             |  93 +++++
 integration_tests/src/database.rs                  |  28 ++
 src/common_types/src/tests.rs                      |  41 ++
 src/proxy/src/opentsdb/mod.rs                      |  97 ++++-
 src/proxy/src/opentsdb/types.rs                    | 372 +++++++++++++++++-
 src/query_frontend/Cargo.toml                      |   2 +
 .../src/{lib.rs => datafusion_util.rs}             |  37 +-
 src/query_frontend/src/frontend.rs                 |  15 +
 src/query_frontend/src/lib.rs                      |   3 +
 src/query_frontend/src/opentsdb/mod.rs             | 417 +++++++++++++++++++++
 src/query_frontend/src/opentsdb/types.rs           |  66 ++++
 src/query_frontend/src/planner.rs                  |  12 +
 src/query_frontend/src/tests.rs                    |  11 +-
 src/server/src/http.rs                             |  19 +-
 16 files changed, 1288 insertions(+), 33 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index d6c3ead8..e8e6fa7e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5572,6 +5572,8 @@ dependencies = [
  "regex-syntax 0.6.29",
  "runtime",
  "schema",
+ "serde",
+ "serde_json",
  "snafu 0.6.10",
  "sqlparser",
  "table_engine",
diff --git a/integration_tests/cases/env/local/opentsdb/basic.result 
b/integration_tests/cases/env/local/opentsdb/basic.result
new file mode 100644
index 00000000..d3f7f444
--- /dev/null
+++ b/integration_tests/cases/env/local/opentsdb/basic.result
@@ -0,0 +1,106 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+DROP TABLE IF EXISTS `opentsdb_table1`;
+
+affected_rows: 0
+
+CREATE TABLE `opentsdb_table1` (
+    `time` timestamp NOT NULL,
+    `level_description` string TAG,
+    `location` string TAG,
+    `value` double,
+    timestamp KEY (time)) ENGINE = Analytic WITH (
+    enable_ttl = 'false'
+);
+
+affected_rows: 0
+
+-- Insert Records:
+-- ("2015-08-18T00:00:00Z", "between 6 and 9 feet", "coyote_creek", 8.12),
+-- ("2015-08-18T00:00:00Z", "below 3 feet", "santa_monica", 2.064),
+-- ("2015-08-18T00:06:00Z", "between 6 and 9 feet", "coyote_creek", 8.005),
+-- ("2015-08-18T00:06:00Z", "below 3 feet", "santa_monica", 2.116),
+-- ("2015-08-18T00:12:00Z", "between 6 and 9 feet", "coyote_creek", 7.887),
+-- ("2015-08-18T00:12:00Z", "below 3 feet", "santa_monica", 2.028);
+INSERT INTO opentsdb_table1(time, level_description, location, value)
+    VALUES
+        (1439827200000, "between 6 and 9 feet", "coyote_creek", 8.12),
+        (1439827200000, "below 3 feet", "santa_monica", 2.064),
+        (1439827560000, "between 6 and 9 feet", "coyote_creek", 8.005),
+        (1439827560000, "below 3 feet", "santa_monica", 2.116),
+        (1439827620000, "between 6 and 9 feet", "coyote_creek", 7.887),
+        (1439827620000, "below 3 feet", "santa_monica", 2.028);
+
+affected_rows: 6
+
+-- SQLNESS ARG protocol=opentsdb
+{
+  "start": 1439827200000,
+  "end": 1439827620000,
+  "queries": [
+    {
+      "aggregator": "none",
+      "metric": "opentsdb_table1",
+      "tags": {}
+    }
+  ]
+}
+;
+
+[{"metric":"opentsdb_table1","tags":{"level_description":"below 3 
feet","location":"santa_monica"},"aggregatedTags":[],"dps":{"1439827200000":2.064,"1439827560000":2.116,"1439827620000":2.028}},{"metric":"opentsdb_table1","tags":{"level_description":"between
 6 and 9 
feet","location":"coyote_creek"},"aggregatedTags":[],"dps":{"1439827200000":8.12,"1439827560000":8.005,"1439827620000":7.887}}]
+
+-- SQLNESS ARG protocol=opentsdb
+{
+  "start": 1439827200000,
+  "end": 1439827620000,
+  "queries": [
+    {
+      "aggregator": "none",
+      "metric": "opentsdb_table1",
+      "tags": {
+         "location": "coyote_creek"
+      }
+    }
+  ]
+}
+;
+
+[{"metric":"opentsdb_table1","tags":{"level_description":"between 6 and 9 
feet","location":"coyote_creek"},"aggregatedTags":[],"dps":{"1439827200000":8.12,"1439827560000":8.005,"1439827620000":7.887}}]
+
+-- SQLNESS ARG protocol=opentsdb
+{
+  "start": 1439827200000,
+  "end": 1439827620000,
+  "queries": [
+    {
+      "aggregator": "sum",
+      "metric": "opentsdb_table1",
+      "tags": {
+      }
+    }
+  ]
+}
+;
+
+[{"metric":"opentsdb_table1","tags":{},"aggregatedTags":[],"dps":{"1439827200000":10.184,"1439827560000":10.121,"1439827620000":9.915}}]
+
+DROP TABLE IF EXISTS `opentsdb_table1`;
+
+affected_rows: 0
+
diff --git a/integration_tests/cases/env/local/opentsdb/basic.sql 
b/integration_tests/cases/env/local/opentsdb/basic.sql
new file mode 100644
index 00000000..5ab91ec5
--- /dev/null
+++ b/integration_tests/cases/env/local/opentsdb/basic.sql
@@ -0,0 +1,93 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+DROP TABLE IF EXISTS `opentsdb_table1`;
+
+CREATE TABLE `opentsdb_table1` (
+    `time` timestamp NOT NULL,
+    `level_description` string TAG,
+    `location` string TAG,
+    `value` double,
+    timestamp KEY (time)) ENGINE = Analytic WITH (
+    enable_ttl = 'false'
+);
+
+-- Insert Records:
+-- ("2015-08-18T00:00:00Z", "between 6 and 9 feet", "coyote_creek", 8.12),
+-- ("2015-08-18T00:00:00Z", "below 3 feet", "santa_monica", 2.064),
+-- ("2015-08-18T00:06:00Z", "between 6 and 9 feet", "coyote_creek", 8.005),
+-- ("2015-08-18T00:06:00Z", "below 3 feet", "santa_monica", 2.116),
+-- ("2015-08-18T00:12:00Z", "between 6 and 9 feet", "coyote_creek", 7.887),
+-- ("2015-08-18T00:12:00Z", "below 3 feet", "santa_monica", 2.028);
+INSERT INTO opentsdb_table1(time, level_description, location, value)
+    VALUES
+        (1439827200000, "between 6 and 9 feet", "coyote_creek", 8.12),
+        (1439827200000, "below 3 feet", "santa_monica", 2.064),
+        (1439827560000, "between 6 and 9 feet", "coyote_creek", 8.005),
+        (1439827560000, "below 3 feet", "santa_monica", 2.116),
+        (1439827620000, "between 6 and 9 feet", "coyote_creek", 7.887),
+        (1439827620000, "below 3 feet", "santa_monica", 2.028);
+
+
+-- SQLNESS ARG protocol=opentsdb
+{
+  "start": 1439827200000,
+  "end": 1439827620000,
+  "queries": [
+    {
+      "aggregator": "none",
+      "metric": "opentsdb_table1",
+      "tags": {}
+    }
+  ]
+}
+;
+
+-- SQLNESS ARG protocol=opentsdb
+{
+  "start": 1439827200000,
+  "end": 1439827620000,
+  "queries": [
+    {
+      "aggregator": "none",
+      "metric": "opentsdb_table1",
+      "tags": {
+         "location": "coyote_creek"
+      }
+    }
+  ]
+}
+;
+
+-- SQLNESS ARG protocol=opentsdb
+{
+  "start": 1439827200000,
+  "end": 1439827620000,
+  "queries": [
+    {
+      "aggregator": "sum",
+      "metric": "opentsdb_table1",
+      "tags": {
+      }
+    }
+  ]
+}
+;
+
+DROP TABLE IF EXISTS `opentsdb_table1`;
diff --git a/integration_tests/src/database.rs 
b/integration_tests/src/database.rs
index c6d1b91d..2020cd84 100644
--- a/integration_tests/src/database.rs
+++ b/integration_tests/src/database.rs
@@ -243,6 +243,7 @@ pub struct HoraeDB<T> {
 enum Protocol {
     Sql,
     InfluxQL,
+    OpenTSDB,
 }
 
 impl TryFrom<&str> for Protocol {
@@ -252,6 +253,7 @@ impl TryFrom<&str> for Protocol {
         let protocol = match s {
             "influxql" => Protocol::InfluxQL,
             "sql" => Protocol::Sql,
+            "opentsdb" => Protocol::OpenTSDB,
             _ => return Err(format!("unknown protocol:{s}")),
         };
 
@@ -312,6 +314,10 @@ impl<T: Send + Sync> Database for HoraeDB<T> {
                 let http_client = self.http_client.clone();
                 Self::execute_influxql(query, http_client, 
context.context).await
             }
+            Protocol::OpenTSDB => {
+                let http_client = self.http_client.clone();
+                Self::execute_opentsdb(query, http_client, 
context.context).await
+            }
         }
     }
 }
@@ -383,6 +389,28 @@ impl<T> HoraeDB<T> {
         Box::new(query_res)
     }
 
+    async fn execute_opentsdb(
+        query: String,
+        http_client: HttpClient,
+        _params: HashMap<String, String>,
+    ) -> Box<dyn Display> {
+        let query = query.trim().trim_end_matches(';');
+        let url = format!("http://{}/opentsdb/api/query";, 
http_client.endpoint);
+        let resp = http_client
+            .client
+            .post(url)
+            .header("content-type", "application/json")
+            .body(query.to_string())
+            .send()
+            .await
+            .unwrap();
+        let query_res = match resp.text().await {
+            Ok(text) => text,
+            Err(e) => format!("Failed to do influxql query, err:{e:?}"),
+        };
+        Box::new(query_res)
+    }
+
     async fn execute_sql(query: String, client: Arc<dyn DbClient>) -> Box<dyn 
Display> {
         let query_ctx = RpcContext {
             database: Some("public".to_string()),
diff --git a/src/common_types/src/tests.rs b/src/common_types/src/tests.rs
index bb59a7c8..28872068 100644
--- a/src/common_types/src/tests.rs
+++ b/src/common_types/src/tests.rs
@@ -241,6 +241,47 @@ pub fn build_schema_for_cpu() -> Schema {
     builder.primary_key_indexes(vec![0, 1]).build().unwrap()
 }
 
+/// Build a schema for testing:
+/// (tsid(uint64), key2(timestamp), tag1(string), tag2(string), value(double),
+pub fn build_schema_for_metric() -> Schema {
+    let builder = schema::Builder::new()
+        .auto_increment_column_id(true)
+        .add_key_column(
+            column_schema::Builder::new(TSID_COLUMN.to_string(), 
DatumKind::UInt64)
+                .build()
+                .unwrap(),
+        )
+        .unwrap()
+        .add_key_column(
+            column_schema::Builder::new("timestamp".to_string(), 
DatumKind::Timestamp)
+                .build()
+                .unwrap(),
+        )
+        .unwrap()
+        .add_normal_column(
+            column_schema::Builder::new("tag1".to_string(), DatumKind::String)
+                .is_tag(true)
+                .build()
+                .unwrap(),
+        )
+        .unwrap()
+        .add_normal_column(
+            column_schema::Builder::new("tag2".to_string(), DatumKind::String)
+                .is_tag(true)
+                .build()
+                .unwrap(),
+        )
+        .unwrap()
+        .add_normal_column(
+            column_schema::Builder::new("value".to_string(), DatumKind::Double)
+                .build()
+                .unwrap(),
+        )
+        .unwrap();
+
+    builder.primary_key_indexes(vec![0, 1]).build().unwrap()
+}
+
 #[allow(clippy::too_many_arguments)]
 pub fn build_row_for_dictionary(
     key1: &[u8],
diff --git a/src/proxy/src/opentsdb/mod.rs b/src/proxy/src/opentsdb/mod.rs
index d9f69810..aae4a4a2 100644
--- a/src/proxy/src/opentsdb/mod.rs
+++ b/src/proxy/src/opentsdb/mod.rs
@@ -15,20 +15,32 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! This module implements [put][1] for OpenTSDB
+//! This module implements [put][1], [query][2] for OpenTSDB
 //! [1]: http://opentsdb.net/docs/build/html/api_http/put.html
+//! [2]: http://opentsdb.net/docs/build/html/api_http/query/index.html
 
+use std::time::Instant;
+
+use futures::{stream::FuturesOrdered, StreamExt};
+use generic_error::BoxError;
 use horaedbproto::storage::{
     RequestContext as GrpcRequestContext, WriteRequest as GrpcWriteRequest,
 };
 use http::StatusCode;
-use logger::debug;
+use logger::{debug, info};
+use query_frontend::{
+    frontend::{Context as SqlContext, Frontend},
+    opentsdb::types::QueryRequest,
+    provider::CatalogMetaProvider,
+};
+use snafu::ResultExt;
 
+use self::types::QueryResponse;
 use crate::{
     context::RequestContext,
-    error::{ErrNoCause, Result},
+    error::{ErrNoCause, ErrWithCause, Result},
     metrics::HTTP_HANDLER_COUNTER_VEC,
-    opentsdb::types::{convert_put_request, PutRequest, PutResponse},
+    opentsdb::types::{convert_output_to_response, convert_put_request, 
PutRequest, PutResponse},
     Context, Proxy,
 };
 
@@ -41,7 +53,6 @@ impl Proxy {
         req: PutRequest,
     ) -> Result<PutResponse> {
         let write_table_requests = convert_put_request(req)?;
-
         let num_rows: usize = write_table_requests
             .iter()
             .map(|req| {
@@ -93,4 +104,80 @@ impl Proxy {
             }
         }
     }
+
+    pub async fn handle_opentsdb_query(
+        &self,
+        ctx: RequestContext,
+        req: QueryRequest,
+    ) -> Result<Vec<QueryResponse>> {
+        let request_id = ctx.request_id;
+        let begin_instant = Instant::now();
+        let deadline = ctx.timeout.map(|t| begin_instant + t);
+
+        info!(
+            "Opentsdb query handler try to process request, request_id:{}, 
request:{:?}",
+            request_id, req
+        );
+
+        let provider = CatalogMetaProvider {
+            manager: self.instance.catalog_manager.clone(),
+            default_catalog: &ctx.catalog,
+            default_schema: &ctx.schema,
+            function_registry: &*self.instance.function_registry,
+        };
+        let frontend = Frontend::new(provider, 
self.instance.dyn_config.fronted.clone());
+        let sql_ctx = SqlContext::new(request_id.clone(), deadline);
+
+        let opentsdb_plan = frontend
+            .opentsdb_query_to_plan(&sql_ctx, req)
+            .box_err()
+            .with_context(|| ErrWithCause {
+                code: StatusCode::BAD_REQUEST,
+                msg: "Failed to build plan",
+            })?;
+
+        for plan in &opentsdb_plan.plans {
+            self.instance
+                .limiter
+                .try_limit(&plan.plan)
+                .box_err()
+                .context(ErrWithCause {
+                    code: StatusCode::INTERNAL_SERVER_ERROR,
+                    msg: "Query is blocked",
+                })?;
+        }
+
+        let mut futures = FuturesOrdered::new();
+        for plan in opentsdb_plan.plans {
+            let request_id_clone = request_id.clone();
+            let one_resp = async {
+                let output = self
+                    .execute_plan(
+                        request_id_clone,
+                        &ctx.catalog,
+                        &ctx.schema,
+                        plan.plan,
+                        deadline,
+                    )
+                    .await?;
+
+                convert_output_to_response(
+                    output,
+                    plan.metric,
+                    plan.field_col_name,
+                    plan.timestamp_col_name,
+                    plan.tags,
+                    plan.aggregated_tags,
+                )
+            };
+
+            futures.push_back(one_resp);
+        }
+
+        let resp = futures.collect::<Vec<_>>().await;
+        let resp = resp.into_iter().collect::<Result<Vec<_>>>()?;
+        let resp = resp.into_iter().flatten().collect();
+
+        Ok(resp)
+    }
 }
diff --git a/src/proxy/src/opentsdb/types.rs b/src/proxy/src/opentsdb/types.rs
index 87f18833..bfeca1bd 100644
--- a/src/proxy/src/opentsdb/types.rs
+++ b/src/proxy/src/opentsdb/types.rs
@@ -16,24 +16,26 @@
 // under the License.
 
 use std::{
-    collections::{HashMap, HashSet},
+    collections::{BTreeMap, HashMap, HashSet},
+    default::Default,
     fmt::Debug,
 };
 
 use bytes::Bytes;
+use common_types::{datum::DatumKind, record_batch::RecordBatch, 
schema::RecordSchema};
 use generic_error::BoxError;
 use horaedbproto::storage::{
     value, Field, FieldGroup, Tag, Value as ProtoValue, WriteSeriesEntry, 
WriteTableRequest,
 };
 use http::StatusCode;
-use serde::Deserialize;
+use interpreters::interpreter::Output;
+use query_frontend::opentsdb::DEFAULT_FIELD;
+use serde::{Deserialize, Serialize};
 use serde_json::from_slice;
-use snafu::{OptionExt, ResultExt};
+use snafu::{ensure, OptionExt, ResultExt};
 use time_ext::try_to_millis;
 
-use crate::error::{ErrNoCause, ErrWithCause, Result};
-
-const OPENTSDB_DEFAULT_FIELD: &str = "value";
+use crate::error::{ErrNoCause, ErrWithCause, InternalNoCause, Result};
 
 #[derive(Debug)]
 pub struct PutRequest {
@@ -142,7 +144,7 @@ pub(crate) fn convert_put_request(req: PutRequest) -> 
Result<Vec<WriteTableReque
         let mut req = WriteTableRequest {
             table: metric,
             tag_names,
-            field_names: vec![String::from(OPENTSDB_DEFAULT_FIELD)],
+            field_names: vec![String::from(DEFAULT_FIELD)],
             entries: Vec::with_capacity(points.len()),
         };
 
@@ -213,3 +215,359 @@ pub(crate) fn validate(points: &[Point]) -> Result<()> {
 
     Ok(())
 }
+
+// http://opentsdb.net/docs/build/html/api_http/query/index.html#response
+#[derive(Serialize, Deserialize, Debug, Default, PartialEq)]
+pub struct QueryResponse {
+    pub(crate) metric: String,
+    /// A list of tags only returned when the results are for a single time
+    /// series. If results are aggregated, this value may be null or an empty
+    /// map
+    pub(crate) tags: BTreeMap<String, String>,
+    /// If more than one timeseries were included in the result set, i.e. they
+    /// were aggregated, this will display a list of tag names that were found
+    /// in common across all time series.
+    #[serde(rename = "aggregatedTags")]
+    pub(crate) aggregated_tags: Vec<String>,
+    pub(crate) dps: BTreeMap<String, f64>,
+}
+
+#[derive(Hash, Debug, Clone, Eq, PartialEq, PartialOrd, Ord)]
+struct GroupKey(Vec<String>);
+
+#[derive(Default)]
+struct QueryConverter {
+    metric: String,
+    timestamp_idx: usize,
+    value_idx: usize,
+    // (column_name, index)
+    tags_idx: Vec<(String, usize)>,
+    aggregated_tags: Vec<String>,
+    // (time_series, (tagk, tagv))
+    tags: BTreeMap<GroupKey, BTreeMap<String, String>>,
+    // (time_series, (timestamp, value))
+    values: BTreeMap<GroupKey, BTreeMap<String, f64>>,
+
+    resp: Vec<QueryResponse>,
+}
+
+impl QueryConverter {
+    fn try_new(
+        schema: &RecordSchema,
+        metric: String,
+        timestamp_col_name: String,
+        field_col_name: String,
+        tags: Vec<String>,
+        aggregated_tags: Vec<String>,
+    ) -> Result<Self> {
+        let timestamp_idx = schema
+            .index_of(&timestamp_col_name)
+            .context(InternalNoCause {
+                msg: "Timestamp column is missing in query response",
+            })?;
+
+        let value_idx = 
schema.index_of(&field_col_name).context(InternalNoCause {
+            msg: "Value column is missing in query response",
+        })?;
+
+        let tags_idx = tags
+            .iter()
+            .map(|tag| {
+                let idx = schema.index_of(tag).context(InternalNoCause {
+                    msg: format!("Tag column is missing in query response, 
tag:{}", tag),
+                })?;
+                ensure!(
+                    matches!(schema.column(idx).data_type, DatumKind::String),
+                    InternalNoCause {
+                        msg: format!(
+                            "Tag must be string type, current:{}",
+                            schema.column(idx).data_type
+                        )
+                    }
+                );
+                Ok((tag.to_string(), idx))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        ensure!(
+            schema.column(timestamp_idx).data_type.is_timestamp(),
+            InternalNoCause {
+                msg: format!(
+                    "Timestamp wrong type, current:{}",
+                    schema.column(timestamp_idx).data_type
+                )
+            }
+        );
+
+        ensure!(
+            schema.column(value_idx).data_type.is_f64_castable(),
+            InternalNoCause {
+                msg: format!(
+                    "Value must be f64 compatible type, current:{}",
+                    schema.column(value_idx).data_type
+                )
+            }
+        );
+
+        Ok(QueryConverter {
+            metric,
+            timestamp_idx,
+            value_idx,
+            tags_idx,
+            aggregated_tags,
+            ..Default::default()
+        })
+    }
+
+    fn add_batch(&mut self, record_batch: RecordBatch) -> Result<()> {
+        let row_num = record_batch.num_rows();
+        for row_idx in 0..row_num {
+            let mut tags = BTreeMap::new();
+            // tags_key is used to identify a time series
+            let mut group_keys = Vec::with_capacity(self.tags_idx.len());
+            for (tag_key, idx) in &self.tags_idx {
+                let tag_value = record_batch
+                    .column(*idx)
+                    .datum(row_idx)
+                    .as_str()
+                    .context(InternalNoCause {
+                        msg: "Tag must be String compatible type".to_string(),
+                    })?
+                    .to_string();
+                group_keys.push(tag_value.clone());
+                tags.insert(tag_key.clone(), tag_value);
+            }
+            let group_keys = GroupKey(group_keys);
+
+            let timestamp = record_batch
+                .column(self.timestamp_idx)
+                .datum(row_idx)
+                .as_timestamp()
+                .context(InternalNoCause {
+                    msg: "Timestamp wrong type".to_string(),
+                })?
+                .as_i64()
+                .to_string();
+
+            let value = record_batch
+                .column(self.value_idx)
+                .datum(row_idx)
+                .as_f64()
+                .context(InternalNoCause {
+                    msg: "Value must be f64 compatible type".to_string(),
+                })?;
+
+            if let Some(values) = self.values.get_mut(&group_keys) {
+                values.insert(timestamp, value);
+            } else {
+                self.tags.insert(group_keys.clone(), tags);
+                self.values
+                    .entry(group_keys)
+                    .or_default()
+                    .insert(timestamp, value);
+            }
+        }
+        Ok(())
+    }
+
+    fn finish(mut self) -> Vec<QueryResponse> {
+        for (key, tags) in self.tags {
+            let dps = self.values.remove(&key).unwrap();
+            self.resp.push(QueryResponse {
+                metric: self.metric.clone(),
+                tags,
+                aggregated_tags: self.aggregated_tags.clone(),
+                dps,
+            });
+        }
+
+        self.resp
+    }
+}
+
+pub(crate) fn convert_output_to_response(
+    output: Output,
+    metric: String,
+    field_col_name: String,
+    timestamp_col_name: String,
+    tags: Vec<String>,
+    aggregated_tags: Vec<String>,
+) -> Result<Vec<QueryResponse>> {
+    let records = match output {
+        Output::Records(records) => records,
+        Output::AffectedRows(_) => {
+            return InternalNoCause {
+                msg: "output in opentsdb query should not be affected rows",
+            }
+            .fail()
+        }
+    };
+
+    let mut converter = match records.first() {
+        None => {
+            return Ok(Vec::new());
+        }
+        Some(batch) => {
+            let record_schema = batch.schema();
+            QueryConverter::try_new(
+                record_schema,
+                metric,
+                timestamp_col_name,
+                field_col_name,
+                tags,
+                aggregated_tags,
+            )?
+        }
+    };
+
+    for record in records {
+        converter.add_batch(record)?;
+    }
+
+    Ok(converter.finish())
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use arrow::{
+        array::{ArrayRef, Float64Array, StringArray, 
TimestampMillisecondArray, UInt64Array},
+        record_batch::RecordBatch as ArrowRecordBatch,
+    };
+    use common_types::{
+        column_schema,
+        datum::DatumKind,
+        record_batch::RecordBatch,
+        schema::{self, Schema, TIMESTAMP_COLUMN, TSID_COLUMN},
+    };
+    use interpreters::RecordBatchVec;
+    use query_frontend::opentsdb::DEFAULT_FIELD;
+
+    use super::*;
+
+    fn build_schema() -> Schema {
+        schema::Builder::new()
+            .auto_increment_column_id(true)
+            .primary_key_indexes(vec![0, 1])
+            .add_key_column(
+                column_schema::Builder::new(TSID_COLUMN.to_string(), 
DatumKind::UInt64)
+                    .build()
+                    .unwrap(),
+            )
+            .unwrap()
+            .add_key_column(
+                column_schema::Builder::new(TIMESTAMP_COLUMN.to_string(), 
DatumKind::Timestamp)
+                    .build()
+                    .unwrap(),
+            )
+            .unwrap()
+            .add_normal_column(
+                column_schema::Builder::new(DEFAULT_FIELD.to_string(), 
DatumKind::Double)
+                    .build()
+                    .unwrap(),
+            )
+            .unwrap()
+            .add_normal_column(
+                column_schema::Builder::new("tag1".to_string(), 
DatumKind::String)
+                    .is_tag(true)
+                    .build()
+                    .unwrap(),
+            )
+            .unwrap()
+            .add_normal_column(
+                column_schema::Builder::new("tag2".to_string(), 
DatumKind::String)
+                    .is_tag(true)
+                    .build()
+                    .unwrap(),
+            )
+            .unwrap()
+            .build()
+            .unwrap()
+    }
+
+    fn build_record_batch(schema: &Schema) -> RecordBatchVec {
+        let tsid: ArrayRef = Arc::new(UInt64Array::from(vec![1, 1, 2, 3, 3]));
+        let timestamp: ArrayRef = 
Arc::new(TimestampMillisecondArray::from(vec![
+            11111111, 11111112, 11111113, 11111111, 11111112,
+        ]));
+        let values: ArrayRef =
+            Arc::new(Float64Array::from(vec![100.0, 101.0, 200.0, 300.0, 
301.0]));
+        let tag1: ArrayRef = Arc::new(StringArray::from(vec!["a", "a", "b", 
"c", "c"]));
+        let tag2: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "y", 
"z", "z"]));
+
+        let batch = ArrowRecordBatch::try_new(
+            schema.to_arrow_schema_ref(),
+            vec![tsid, timestamp, values, tag1, tag2],
+        )
+        .unwrap();
+
+        vec![RecordBatch::try_from(batch).unwrap()]
+    }
+
+    #[test]
+    fn test_convert_output_to_response() {
+        let metric = "metric".to_string();
+        let tags = vec!["tag1".to_string(), "tag2".to_string()];
+        let schema = build_schema();
+        let record_batch = build_record_batch(&schema);
+        let result = convert_output_to_response(
+            Output::Records(record_batch),
+            metric.clone(),
+            DEFAULT_FIELD.to_string(),
+            TIMESTAMP_COLUMN.to_string(),
+            tags,
+            vec![],
+        )
+        .unwrap();
+
+        assert_eq!(
+            vec![
+                QueryResponse {
+                    metric: metric.clone(),
+                    tags: vec![
+                        ("tag1".to_string(), "a".to_string()),
+                        ("tag2".to_string(), "x".to_string()),
+                    ]
+                    .into_iter()
+                    .collect(),
+                    aggregated_tags: vec![],
+                    dps: vec![
+                        ("11111111".to_string(), 100.0),
+                        ("11111112".to_string(), 101.0),
+                    ]
+                    .into_iter()
+                    .collect(),
+                },
+                QueryResponse {
+                    metric: metric.clone(),
+                    tags: vec![
+                        ("tag1".to_string(), "b".to_string()),
+                        ("tag2".to_string(), "y".to_string()),
+                    ]
+                    .into_iter()
+                    .collect(),
+                    aggregated_tags: vec![],
+                    dps: vec![("11111113".to_string(), 
200.0),].into_iter().collect(),
+                },
+                QueryResponse {
+                    metric: metric.clone(),
+                    tags: vec![
+                        ("tag1".to_string(), "c".to_string()),
+                        ("tag2".to_string(), "z".to_string()),
+                    ]
+                    .into_iter()
+                    .collect(),
+                    aggregated_tags: vec![],
+                    dps: vec![
+                        ("11111111".to_string(), 300.0),
+                        ("11111112".to_string(), 301.0),
+                    ]
+                    .into_iter()
+                    .collect(),
+                },
+            ],
+            result
+        );
+    }
+}
diff --git a/src/query_frontend/Cargo.toml b/src/query_frontend/Cargo.toml
index 34751e61..7956b63e 100644
--- a/src/query_frontend/Cargo.toml
+++ b/src/query_frontend/Cargo.toml
@@ -62,6 +62,8 @@ prom-remote-api = { workspace = true }
 regex = { workspace = true }
 regex-syntax = "0.6.28"
 runtime = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
 snafu = { workspace = true }
 sqlparser = { workspace = true }
 table_engine = { workspace = true }
diff --git a/src/query_frontend/src/lib.rs 
b/src/query_frontend/src/datafusion_util.rs
similarity index 55%
copy from src/query_frontend/src/lib.rs
copy to src/query_frontend/src/datafusion_util.rs
index 9ca16a6e..561c21c6 100644
--- a/src/query_frontend/src/lib.rs
+++ b/src/query_frontend/src/datafusion_util.rs
@@ -15,23 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#![feature(once_cell_try)]
+use common_types::{schema::TSID_COLUMN, time::TimeRange};
+use datafusion::{
+    logical_expr::Between,
+    prelude::{col, lit, Expr},
+};
 
-//! SQL frontend
-//!
-//! Parse sql into logical plan that can be handled by interpreters
+pub fn timerange_to_expr(query_range: &TimeRange, column_name: &str) -> Expr {
+    Expr::Between(Between {
+        expr: Box::new(col(column_name)),
+        negated: false,
+        low: Box::new(lit(query_range.inclusive_start().as_i64())),
+        high: Box::new(lit(query_range.exclusive_end().as_i64() - 1)),
+    })
+}
 
-pub mod ast;
-pub mod config;
-pub mod container;
-pub mod frontend;
-pub mod influxql;
-mod logical_optimizer;
-pub mod parser;
-mod partition;
-pub mod plan;
-pub mod planner;
-pub mod promql;
-pub mod provider;
-#[cfg(any(test, feature = "test"))]
-pub mod tests;
+pub fn default_sort_exprs(timestamp_column: &str) -> Vec<Expr> {
+    vec![
+        col(TSID_COLUMN).sort(true, true),
+        col(timestamp_column).sort(true, true),
+    ]
+}
diff --git a/src/query_frontend/src/frontend.rs 
b/src/query_frontend/src/frontend.rs
index 5743b234..0c1daaf0 100644
--- a/src/query_frontend/src/frontend.rs
+++ b/src/query_frontend/src/frontend.rs
@@ -33,6 +33,7 @@ use table_engine::table;
 use crate::{
     ast::{Statement, TableName},
     config::DynamicConfig,
+    opentsdb::types::{OpentsdbQueryPlan, QueryRequest},
     parser::Parser,
     plan::Plan,
     planner::Planner,
@@ -196,6 +197,20 @@ impl<P: MetaProvider> Frontend<P> {
         planner.influxql_stmt_to_plan(stmt).context(CreatePlan)
     }
 
+    pub fn opentsdb_query_to_plan(
+        &self,
+        ctx: &Context,
+        query: QueryRequest,
+    ) -> Result<OpentsdbQueryPlan> {
+        let planner = Planner::new(
+            &self.provider,
+            ctx.request_id.clone(),
+            ctx.read_parallelism,
+            self.dyn_config.as_ref(),
+        );
+        planner.opentsdb_query_to_plan(query).context(CreatePlan)
+    }
+
     pub fn write_req_to_plan(
         &self,
         ctx: &Context,
diff --git a/src/query_frontend/src/lib.rs b/src/query_frontend/src/lib.rs
index 9ca16a6e..ea8a9ac8 100644
--- a/src/query_frontend/src/lib.rs
+++ b/src/query_frontend/src/lib.rs
@@ -24,9 +24,12 @@
 pub mod ast;
 pub mod config;
 pub mod container;
+mod datafusion_util;
 pub mod frontend;
 pub mod influxql;
+
 mod logical_optimizer;
+pub mod opentsdb;
 pub mod parser;
 mod partition;
 pub mod plan;
diff --git a/src/query_frontend/src/opentsdb/mod.rs 
b/src/query_frontend/src/opentsdb/mod.rs
new file mode 100644
index 00000000..6a297141
--- /dev/null
+++ b/src/query_frontend/src/opentsdb/mod.rs
@@ -0,0 +1,417 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::HashMap, sync::Arc};
+
+use common_types::{
+    schema::Schema,
+    time::{TimeRange, Timestamp},
+};
+use datafusion::{
+    error::DataFusionError,
+    logical_expr::LogicalPlanBuilder,
+    optimizer::utils::conjunction,
+    prelude::{avg, count, ident, lit, max, min, stddev, sum, Expr},
+    sql::{planner::ContextProvider, TableReference},
+};
+use macros::define_result;
+use snafu::{OptionExt, ResultExt, Snafu};
+
+use self::types::{Filter, OpentsdbQueryPlan, OpentsdbSubPlan, QueryRequest, 
SubQuery};
+use crate::{
+    config::DynamicConfig,
+    datafusion_util::{default_sort_exprs, timerange_to_expr},
+    logical_optimizer::optimize_plan,
+    plan::{Plan, QueryPlan},
+    provider::{ContextProviderAdapter, MetaProvider},
+};
+
+pub mod types;
+
+pub const DEFAULT_FIELD: &str = "value";
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Table provider not found, table:{name}, err:{source}"))]
+    TableProviderNotFound {
+        name: String,
+        source: DataFusionError,
+    },
+
+    #[snafu(display("Failed to build schema, err:{source}"))]
+    BuildTableSchema { source: common_types::schema::Error },
+
+    #[snafu(display("Query end should larger than start, start:{start}, 
end:{end}"))]
+    InvalidRange { start: i64, end: i64 },
+
+    #[snafu(display("Invalid filter, value:{filter_type}"))]
+    InvalidFilter { filter_type: String },
+
+    #[snafu(display("Invalid aggregator, value:{aggr}"))]
+    InvalidAggregator { aggr: String },
+
+    #[snafu(display("Failed to build plan, source:{source}"))]
+    BuildPlanError { source: DataFusionError },
+
+    #[snafu(display("MetaProvider {msg}, err:{source}"))]
+    MetaProviderError {
+        msg: String,
+        source: crate::provider::Error,
+    },
+}
+
+impl From<DataFusionError> for Error {
+    fn from(df_err: DataFusionError) -> Self {
+        Error::BuildPlanError { source: df_err }
+    }
+}
+
+define_result!(Error);
+
+fn normalize_filters(
+    tags: HashMap<String, String>,
+    filters: Vec<Filter>,
+) -> Result<(Vec<String>, Vec<Expr>)> {
+    let mut groupby_col_names = Vec::new();
+    let mut exprs = Vec::with_capacity(tags.len() + filters.len());
+    for (tagk, tagv) in tags {
+        exprs.push(ident(&tagk).eq(lit(tagv)));
+        groupby_col_names.push(tagk);
+    }
+
+    for filter in filters {
+        let col_name = ident(&filter.tagk);
+
+        if filter.group_by {
+            groupby_col_names.push(filter.tagk);
+        }
+
+        // http://opentsdb.net/docs/build/html/user_guide/query/filters.html
+        let expr = match filter.r#type.as_str() {
+            "literal_or" => {
+                let vs = filter.filter.split('|').map(lit).collect();
+                col_name.in_list(vs, false)
+            }
+            "not_literal_or" => {
+                let vs = filter.filter.split('|').map(lit).collect();
+                col_name.in_list(vs, true)
+            }
+            filter_type => return InvalidFilter { filter_type }.fail(),
+        };
+        exprs.push(expr);
+    }
+
+    Ok((groupby_col_names, exprs))
+}
+
+fn build_aggr_expr(aggr: &str) -> Result<Option<Expr>> {
+    // http://opentsdb.net/docs/build/html/user_guide/query/aggregators.html
+    let aggr = match aggr {
+        "sum" => sum(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+        "count" => count(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+        "avg" => avg(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+        "min" => min(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+        "max" => max(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+        "dev" => stddev(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+        "none" => return Ok(None),
+        _ => return InvalidAggregator { aggr }.fail(),
+    };
+
+    Ok(Some(aggr))
+}
+
+pub fn subquery_to_plan<P: MetaProvider>(
+    meta_provider: ContextProviderAdapter<'_, P>,
+    query_range: &TimeRange,
+    sub_query: SubQuery,
+) -> Result<OpentsdbSubPlan> {
+    let metric = sub_query.metric;
+    let table_provider = meta_provider
+        .get_table_source(TableReference::bare(&metric))
+        .context(TableProviderNotFound { name: &metric })?;
+    let schema = 
Schema::try_from(table_provider.schema()).context(BuildTableSchema)?;
+
+    let timestamp_col_name = schema.timestamp_name();
+    let mut tags = schema
+        .columns()
+        .iter()
+        .filter(|column| column.is_tag)
+        .map(|column| column.name.clone())
+        .collect::<Vec<_>>();
+    let (mut groupby_col_names, filter_exprs) = {
+        let (groupby, mut filters) = normalize_filters(sub_query.tags, 
sub_query.filters)?;
+        filters.push(timerange_to_expr(query_range, timestamp_col_name));
+        let anded_filters = conjunction(filters).expect("at least one 
filter(timestamp)");
+        (groupby, anded_filters)
+    };
+    let sort_exprs = default_sort_exprs(timestamp_col_name);
+
+    // TODO: support projection since there are multiple field columns.
+    let mut builder = LogicalPlanBuilder::scan(metric.clone(), table_provider, 
None)?
+        .filter(filter_exprs)?
+        .sort(sort_exprs)?;
+
+    match build_aggr_expr(&sub_query.aggregator)? {
+        Some(aggr_expr) => {
+            let mut group_expr = 
groupby_col_names.iter().map(ident).collect::<Vec<_>>();
+            group_expr.push(ident(timestamp_col_name));
+            builder = builder.aggregate(group_expr, [aggr_expr])?;
+            tags = groupby_col_names.clone();
+        }
+        None => groupby_col_names.clear(),
+    }
+
+    let df_plan = builder.build().context(BuildPlanError)?;
+    let df_plan = optimize_plan(&df_plan).context(BuildPlanError)?;
+
+    let tables = Arc::new(
+        meta_provider
+            .try_into_container()
+            .context(MetaProviderError {
+                msg: "Failed to find meta",
+            })?,
+    );
+
+    Ok(OpentsdbSubPlan {
+        plan: Plan::Query(QueryPlan {
+            df_plan,
+            table_name: Some(metric.clone()),
+            tables,
+        }),
+        metric,
+        timestamp_col_name: timestamp_col_name.to_string(),
+        field_col_name: DEFAULT_FIELD.to_string(),
+        tags,
+        aggregated_tags: groupby_col_names,
+    })
+}
+
+pub fn opentsdb_query_to_plan<P: MetaProvider>(
+    query: QueryRequest,
+    provider: &P,
+    read_parallelism: usize,
+    dyn_config: &DynamicConfig,
+) -> Result<OpentsdbQueryPlan> {
+    let range = TimeRange::new(Timestamp::new(query.start), 
Timestamp::new(query.end + 1))
+        .context(InvalidRange {
+            start: query.start,
+            end: query.end,
+        })?;
+
+    let plans = query
+        .queries
+        .into_iter()
+        .map(|sub_query| {
+            subquery_to_plan(
+                ContextProviderAdapter::new(provider, read_parallelism, 
dyn_config),
+                &range,
+                sub_query,
+            )
+        })
+        .collect::<Result<Vec<_>>>()?;
+
+    Ok(OpentsdbQueryPlan { plans })
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use super::*;
+    use crate::tests::MockMetaProvider;
+
+    #[test]
+    fn test_normalize_filters() {
+        let mut tags = HashMap::new();
+        tags.insert("tag1".to_string(), "tag1_value".to_string());
+
+        let (groupby_col_names, exprs) = normalize_filters(tags.clone(), 
vec![]).unwrap();
+        assert_eq!(groupby_col_names, vec!["tag1"]);
+        assert_eq!(exprs.len(), 1);
+
+        let (groupby_col_names, exprs) = normalize_filters(
+            tags.clone(),
+            vec![Filter {
+                r#type: "literal_or".to_string(),
+                tagk: "tag2".to_string(),
+                filter: "tag2_value|tag2_value2".to_string(),
+                group_by: false,
+            }],
+        )
+        .unwrap();
+        assert_eq!(groupby_col_names, vec!["tag1"]);
+        assert_eq!(exprs.len(), 2);
+
+        let (groupby_col_names, exprs) = normalize_filters(
+            tags.clone(),
+            vec![Filter {
+                r#type: "literal_or".to_string(),
+                tagk: "tag2".to_string(),
+                filter: "tag2_value|tag2_value2".to_string(),
+                group_by: true,
+            }],
+        )
+        .unwrap();
+        assert_eq!(groupby_col_names, vec!["tag1", "tag2"]);
+        assert_eq!(exprs.len(), 2);
+
+        let (groupby_col_names, exprs) = normalize_filters(
+            tags.clone(),
+            vec![Filter {
+                r#type: "not_literal_or".to_string(),
+                tagk: "tag2".to_string(),
+                filter: "tag2_value|tag2_value2".to_string(),
+                group_by: false,
+            }],
+        )
+        .unwrap();
+        assert_eq!(groupby_col_names, vec!["tag1"]);
+        assert_eq!(exprs.len(), 2);
+    }
+
+    #[test]
+    fn test_build_aggr_expr() {
+        let aggr = build_aggr_expr("sum").unwrap().unwrap();
+        assert_eq!(aggr.to_string(), "SUM(value) AS value");
+
+        let aggr = build_aggr_expr("count").unwrap().unwrap();
+        assert_eq!(aggr.to_string(), "COUNT(value) AS value");
+
+        let aggr = build_aggr_expr("avg").unwrap().unwrap();
+        assert_eq!(aggr.to_string(), "AVG(value) AS value");
+
+        let aggr = build_aggr_expr("min").unwrap().unwrap();
+        assert_eq!(aggr.to_string(), "MIN(value) AS value");
+
+        let aggr = build_aggr_expr("max").unwrap().unwrap();
+        assert_eq!(aggr.to_string(), "MAX(value) AS value");
+
+        let aggr = build_aggr_expr("dev").unwrap().unwrap();
+        assert_eq!(aggr.to_string(), "STDDEV(value) AS value");
+
+        let aggr = build_aggr_expr("none").unwrap();
+        assert!(aggr.is_none());
+
+        let err = build_aggr_expr("invalid").unwrap_err();
+        assert!(err.to_string().contains("Invalid aggregator"));
+    }
+
+    #[test]
+    fn test_subquery_to_plan() {
+        let meta_provider = MockMetaProvider::default();
+
+        let query_range = TimeRange::new(Timestamp::new(0), 
Timestamp::new(100)).unwrap();
+        let sub_query = SubQuery {
+            metric: "metric".to_string(),
+            aggregator: "sum".to_string(),
+            rate: false,
+            downsample: None,
+            tags: vec![("tag1".to_string(), "tag1_value".to_string())]
+                .into_iter()
+                .collect(),
+            filters: vec![Filter {
+                r#type: "literal_or".to_string(),
+                tagk: "tag2".to_string(),
+                filter: "tag2_value|tag2_value2".to_string(),
+                group_by: true,
+            }],
+        };
+
+        let plan = subquery_to_plan(
+            ContextProviderAdapter::new(&meta_provider, 1, 
&Default::default()),
+            &query_range,
+            sub_query,
+        )
+        .unwrap();
+
+        assert_eq!(plan.metric, "metric");
+        assert_eq!(plan.field_col_name, "value");
+        assert_eq!(plan.timestamp_col_name, "timestamp");
+        assert_eq!(plan.tags, vec!["tag1", "tag2"]);
+        assert_eq!(plan.aggregated_tags, vec!["tag1", "tag2"]);
+
+        let df_plan = match plan.plan {
+            Plan::Query(QueryPlan { df_plan, .. }) => df_plan,
+            _ => panic!("expect query plan"),
+        };
+        assert_eq!(df_plan.schema().fields().len(), 4);
+    }
+
+    #[test]
+    fn test_opentsdb_query_to_plan() {
+        let meta_provider = MockMetaProvider::default();
+
+        let query = QueryRequest {
+            start: 0,
+            end: 100,
+            queries: vec![
+                SubQuery {
+                    metric: "metric".to_string(),
+                    aggregator: "none".to_string(),
+                    rate: false,
+                    downsample: None,
+                    tags: vec![("tag1".to_string(), "tag1_value".to_string())]
+                        .into_iter()
+                        .collect(),
+                    filters: vec![],
+                },
+                SubQuery {
+                    metric: "metric".to_string(),
+                    aggregator: "sum".to_string(),
+                    rate: false,
+                    downsample: None,
+                    tags: vec![("tag1".to_string(), "tag1_value".to_string())]
+                        .into_iter()
+                        .collect(),
+                    filters: vec![],
+                },
+            ],
+            ms_resolution: false,
+        };
+
+        let plan =
+            opentsdb_query_to_plan(query, &meta_provider, 1, 
&DynamicConfig::default()).unwrap();
+
+        assert_eq!(plan.plans.len(), 2);
+
+        let plan0 = &plan.plans[0];
+        assert_eq!(plan0.metric, "metric");
+        assert_eq!(plan0.field_col_name, "value");
+        assert_eq!(plan0.timestamp_col_name, "timestamp");
+        assert_eq!(plan0.tags, vec!["tag1", "tag2"]);
+        assert!(plan0.aggregated_tags.is_empty());
+
+        let df_plan = match &plan0.plan {
+            Plan::Query(QueryPlan { df_plan, .. }) => df_plan,
+            _ => panic!("expect query plan"),
+        };
+        assert_eq!(df_plan.schema().fields().len(), 5);
+
+        let plan1 = &plan.plans[1];
+        assert_eq!(plan1.metric, "metric");
+        assert_eq!(plan1.field_col_name, "value");
+        assert_eq!(plan1.timestamp_col_name, "timestamp");
+        assert_eq!(plan1.tags, vec!["tag1"]);
+        assert_eq!(plan1.aggregated_tags, vec!["tag1"]);
+
+        let df_plan = match &plan1.plan {
+            Plan::Query(QueryPlan { df_plan, .. }) => df_plan,
+            _ => panic!("expect query plan"),
+        };
+        assert_eq!(df_plan.schema().fields().len(), 3);
+    }
+}
diff --git a/src/query_frontend/src/opentsdb/types.rs 
b/src/query_frontend/src/opentsdb/types.rs
new file mode 100644
index 00000000..791ee7c2
--- /dev/null
+++ b/src/query_frontend/src/opentsdb/types.rs
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use serde::{Deserialize, Serialize};
+
+use crate::plan::Plan;
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct Filter {
+    pub r#type: String,
+    pub tagk: String,
+    pub filter: String,
+    #[serde(rename = "groupBy")]
+    pub group_by: bool,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct SubQuery {
+    pub metric: String,
+    pub aggregator: String,
+    #[serde(default)]
+    pub rate: bool,
+    pub downsample: Option<String>,
+    #[serde(default)]
+    pub tags: HashMap<String, String>,
+    #[serde(default)]
+    pub filters: Vec<Filter>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct QueryRequest {
+    pub start: i64,
+    pub end: i64,
+    pub queries: Vec<SubQuery>,
+    #[serde(rename = "msResolution", default)]
+    pub ms_resolution: bool,
+}
+
+pub struct OpentsdbSubPlan {
+    pub plan: Plan,
+    pub metric: String,
+    pub field_col_name: String,
+    pub timestamp_col_name: String,
+    pub tags: Vec<String>,
+    pub aggregated_tags: Vec<String>,
+}
+
+pub struct OpentsdbQueryPlan {
+    pub plans: Vec<OpentsdbSubPlan>,
+}
diff --git a/src/query_frontend/src/planner.rs 
b/src/query_frontend/src/planner.rs
index 9fc7ade9..b5d0fb50 100644
--- a/src/query_frontend/src/planner.rs
+++ b/src/query_frontend/src/planner.rs
@@ -72,6 +72,10 @@ use crate::{
     container::TableReference,
     frontend::parse_table_name_with_standard,
     logical_optimizer::optimize_plan,
+    opentsdb::{
+        opentsdb_query_to_plan,
+        types::{OpentsdbQueryPlan, QueryRequest},
+    },
     parser,
     partition::PartitionParser,
     plan::{
@@ -250,6 +254,9 @@ pub enum Error {
     #[snafu(display("Failed to build plan from promql, error:{}", source))]
     BuildPromPlanError { source: crate::promql::Error },
 
+    #[snafu(display("Failed to build opentsdb plan, error:{}", source))]
+    BuildOpentsdbPlanError { source: crate::opentsdb::Error },
+
     #[snafu(display(
         "Failed to cast default value expr to column type, expr:{}, from:{}, 
to:{}",
         expr,
@@ -381,6 +388,11 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
             .context(BuildInfluxqlPlan)
     }
 
+    pub fn opentsdb_query_to_plan(&self, query: QueryRequest) -> 
Result<OpentsdbQueryPlan> {
+        opentsdb_query_to_plan(query, self.provider, self.read_parallelism, 
self.dyn_config)
+            .context(BuildOpentsdbPlanError)
+    }
+
     pub fn write_req_to_plan(
         &self,
         schema_config: &SchemaConfig,
diff --git a/src/query_frontend/src/tests.rs b/src/query_frontend/src/tests.rs
index 7d180e6d..d110a416 100644
--- a/src/query_frontend/src/tests.rs
+++ b/src/query_frontend/src/tests.rs
@@ -18,7 +18,9 @@
 use std::sync::Arc;
 
 use catalog::consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA};
-use common_types::tests::{build_default_value_schema, build_schema, 
build_schema_for_cpu};
+use common_types::tests::{
+    build_default_value_schema, build_schema, build_schema_for_cpu, 
build_schema_for_metric,
+};
 use datafusion::catalog::TableReference;
 use df_operator::{scalar::ScalarUdf, udaf::AggregateUdf};
 use partition_table_engine::test_util::PartitionedMemoryTable;
@@ -86,6 +88,13 @@ impl Default for MockMetaProvider {
                 )),
                 // Used in `test_partitioned_table_query_to_plan`
                 Arc::new(test_partitioned_table),
+                // Used in `test_subquery_to_plan` and 
`test_opentsdb_query_to_plan`
+                Arc::new(MemoryTable::new(
+                    "metric".to_string(),
+                    TableId::from(106),
+                    build_schema_for_metric(),
+                    ANALYTIC_ENGINE_TYPE.to_string(),
+                )),
             ],
         }
     }
diff --git a/src/server/src/http.rs b/src/server/src/http.rs
index 43a1910f..95ce7a18 100644
--- a/src/server/src/http.rs
+++ b/src/server/src/http.rs
@@ -417,7 +417,8 @@ impl Service {
         warp::path!("influxdb" / "v1" / ..).and(write_api.or(query_api))
     }
 
-    // POST /opentsdb/api/put
+    /// Expose `/opentsdb/api/put` and `/opentsdb/api/query` to serve opentsdb
+    /// API
     fn opentsdb_api(
         &self,
     ) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + 
Clone {
@@ -455,7 +456,21 @@ impl Service {
                 }
             });
 
-        warp::path!("opentsdb" / "api" / ..).and(put_api)
+        let query_api = warp::path!("query")
+            .and(warp::post())
+            .and(body_limit)
+            .and(self.with_context())
+            .and(warp::body::json())
+            .and(self.with_proxy())
+            .and_then(|ctx, req, proxy: Arc<Proxy>| async move {
+                let result = proxy.handle_opentsdb_query(ctx, req).await;
+                match result {
+                    Ok(res) => Ok(reply::json(&res)),
+                    Err(e) => Err(reject::custom(e)),
+                }
+            });
+
+        warp::path!("opentsdb" / "api" / ..).and(put_api.or(query_api))
     }
 
     // POST /debug/flush_memtable


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to