This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new a1db8828 feat: implement opentsdb query (#1453)
a1db8828 is described below
commit a1db882892013448c2ba93f6bee620aa2b021201
Author: 鲍金日 <[email protected]>
AuthorDate: Fri Mar 29 18:45:33 2024 +0800
feat: implement opentsdb query (#1453)
## Rationale
Opentsdb write protocol is already supported, this PR implement query
protocol.
## Detailed Changes
- Convert opentsdb query requests into datafusion logical plans
- Convert the RecordBatch format of the query results into the return
response format of the opentsdb query requests
## Test Plan
- Existing tests
- add new unit tests and integration
---------
Co-authored-by: jiacai2050 <[email protected]>
---
Cargo.lock | 2 +
.../cases/env/local/opentsdb/basic.result | 106 ++++++
.../cases/env/local/opentsdb/basic.sql | 93 +++++
integration_tests/src/database.rs | 28 ++
src/common_types/src/tests.rs | 41 ++
src/proxy/src/opentsdb/mod.rs | 97 ++++-
src/proxy/src/opentsdb/types.rs | 372 +++++++++++++++++-
src/query_frontend/Cargo.toml | 2 +
.../src/{lib.rs => datafusion_util.rs} | 37 +-
src/query_frontend/src/frontend.rs | 15 +
src/query_frontend/src/lib.rs | 3 +
src/query_frontend/src/opentsdb/mod.rs | 417 +++++++++++++++++++++
src/query_frontend/src/opentsdb/types.rs | 66 ++++
src/query_frontend/src/planner.rs | 12 +
src/query_frontend/src/tests.rs | 11 +-
src/server/src/http.rs | 19 +-
16 files changed, 1288 insertions(+), 33 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index d6c3ead8..e8e6fa7e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5572,6 +5572,8 @@ dependencies = [
"regex-syntax 0.6.29",
"runtime",
"schema",
+ "serde",
+ "serde_json",
"snafu 0.6.10",
"sqlparser",
"table_engine",
diff --git a/integration_tests/cases/env/local/opentsdb/basic.result
b/integration_tests/cases/env/local/opentsdb/basic.result
new file mode 100644
index 00000000..d3f7f444
--- /dev/null
+++ b/integration_tests/cases/env/local/opentsdb/basic.result
@@ -0,0 +1,106 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+DROP TABLE IF EXISTS `opentsdb_table1`;
+
+affected_rows: 0
+
+CREATE TABLE `opentsdb_table1` (
+ `time` timestamp NOT NULL,
+ `level_description` string TAG,
+ `location` string TAG,
+ `value` double,
+ timestamp KEY (time)) ENGINE = Analytic WITH (
+ enable_ttl = 'false'
+);
+
+affected_rows: 0
+
+-- Insert Records:
+-- ("2015-08-18T00:00:00Z", "between 6 and 9 feet", "coyote_creek", 8.12),
+-- ("2015-08-18T00:00:00Z", "below 3 feet", "santa_monica", 2.064),
+-- ("2015-08-18T00:06:00Z", "between 6 and 9 feet", "coyote_creek", 8.005),
+-- ("2015-08-18T00:06:00Z", "below 3 feet", "santa_monica", 2.116),
+-- ("2015-08-18T00:12:00Z", "between 6 and 9 feet", "coyote_creek", 7.887),
+-- ("2015-08-18T00:12:00Z", "below 3 feet", "santa_monica", 2.028);
+INSERT INTO opentsdb_table1(time, level_description, location, value)
+ VALUES
+ (1439827200000, "between 6 and 9 feet", "coyote_creek", 8.12),
+ (1439827200000, "below 3 feet", "santa_monica", 2.064),
+ (1439827560000, "between 6 and 9 feet", "coyote_creek", 8.005),
+ (1439827560000, "below 3 feet", "santa_monica", 2.116),
+ (1439827620000, "between 6 and 9 feet", "coyote_creek", 7.887),
+ (1439827620000, "below 3 feet", "santa_monica", 2.028);
+
+affected_rows: 6
+
+-- SQLNESS ARG protocol=opentsdb
+{
+ "start": 1439827200000,
+ "end": 1439827620000,
+ "queries": [
+ {
+ "aggregator": "none",
+ "metric": "opentsdb_table1",
+ "tags": {}
+ }
+ ]
+}
+;
+
+[{"metric":"opentsdb_table1","tags":{"level_description":"below 3
feet","location":"santa_monica"},"aggregatedTags":[],"dps":{"1439827200000":2.064,"1439827560000":2.116,"1439827620000":2.028}},{"metric":"opentsdb_table1","tags":{"level_description":"between
6 and 9
feet","location":"coyote_creek"},"aggregatedTags":[],"dps":{"1439827200000":8.12,"1439827560000":8.005,"1439827620000":7.887}}]
+
+-- SQLNESS ARG protocol=opentsdb
+{
+ "start": 1439827200000,
+ "end": 1439827620000,
+ "queries": [
+ {
+ "aggregator": "none",
+ "metric": "opentsdb_table1",
+ "tags": {
+ "location": "coyote_creek"
+ }
+ }
+ ]
+}
+;
+
+[{"metric":"opentsdb_table1","tags":{"level_description":"between 6 and 9
feet","location":"coyote_creek"},"aggregatedTags":[],"dps":{"1439827200000":8.12,"1439827560000":8.005,"1439827620000":7.887}}]
+
+-- SQLNESS ARG protocol=opentsdb
+{
+ "start": 1439827200000,
+ "end": 1439827620000,
+ "queries": [
+ {
+ "aggregator": "sum",
+ "metric": "opentsdb_table1",
+ "tags": {
+ }
+ }
+ ]
+}
+;
+
+[{"metric":"opentsdb_table1","tags":{},"aggregatedTags":[],"dps":{"1439827200000":10.184,"1439827560000":10.121,"1439827620000":9.915}}]
+
+DROP TABLE IF EXISTS `opentsdb_table1`;
+
+affected_rows: 0
+
diff --git a/integration_tests/cases/env/local/opentsdb/basic.sql
b/integration_tests/cases/env/local/opentsdb/basic.sql
new file mode 100644
index 00000000..5ab91ec5
--- /dev/null
+++ b/integration_tests/cases/env/local/opentsdb/basic.sql
@@ -0,0 +1,93 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+DROP TABLE IF EXISTS `opentsdb_table1`;
+
+CREATE TABLE `opentsdb_table1` (
+ `time` timestamp NOT NULL,
+ `level_description` string TAG,
+ `location` string TAG,
+ `value` double,
+ timestamp KEY (time)) ENGINE = Analytic WITH (
+ enable_ttl = 'false'
+);
+
+-- Insert Records:
+-- ("2015-08-18T00:00:00Z", "between 6 and 9 feet", "coyote_creek", 8.12),
+-- ("2015-08-18T00:00:00Z", "below 3 feet", "santa_monica", 2.064),
+-- ("2015-08-18T00:06:00Z", "between 6 and 9 feet", "coyote_creek", 8.005),
+-- ("2015-08-18T00:06:00Z", "below 3 feet", "santa_monica", 2.116),
+-- ("2015-08-18T00:12:00Z", "between 6 and 9 feet", "coyote_creek", 7.887),
+-- ("2015-08-18T00:12:00Z", "below 3 feet", "santa_monica", 2.028);
+INSERT INTO opentsdb_table1(time, level_description, location, value)
+ VALUES
+ (1439827200000, "between 6 and 9 feet", "coyote_creek", 8.12),
+ (1439827200000, "below 3 feet", "santa_monica", 2.064),
+ (1439827560000, "between 6 and 9 feet", "coyote_creek", 8.005),
+ (1439827560000, "below 3 feet", "santa_monica", 2.116),
+ (1439827620000, "between 6 and 9 feet", "coyote_creek", 7.887),
+ (1439827620000, "below 3 feet", "santa_monica", 2.028);
+
+
+-- SQLNESS ARG protocol=opentsdb
+{
+ "start": 1439827200000,
+ "end": 1439827620000,
+ "queries": [
+ {
+ "aggregator": "none",
+ "metric": "opentsdb_table1",
+ "tags": {}
+ }
+ ]
+}
+;
+
+-- SQLNESS ARG protocol=opentsdb
+{
+ "start": 1439827200000,
+ "end": 1439827620000,
+ "queries": [
+ {
+ "aggregator": "none",
+ "metric": "opentsdb_table1",
+ "tags": {
+ "location": "coyote_creek"
+ }
+ }
+ ]
+}
+;
+
+-- SQLNESS ARG protocol=opentsdb
+{
+ "start": 1439827200000,
+ "end": 1439827620000,
+ "queries": [
+ {
+ "aggregator": "sum",
+ "metric": "opentsdb_table1",
+ "tags": {
+ }
+ }
+ ]
+}
+;
+
+DROP TABLE IF EXISTS `opentsdb_table1`;
diff --git a/integration_tests/src/database.rs
b/integration_tests/src/database.rs
index c6d1b91d..2020cd84 100644
--- a/integration_tests/src/database.rs
+++ b/integration_tests/src/database.rs
@@ -243,6 +243,7 @@ pub struct HoraeDB<T> {
enum Protocol {
Sql,
InfluxQL,
+ OpenTSDB,
}
impl TryFrom<&str> for Protocol {
@@ -252,6 +253,7 @@ impl TryFrom<&str> for Protocol {
let protocol = match s {
"influxql" => Protocol::InfluxQL,
"sql" => Protocol::Sql,
+ "opentsdb" => Protocol::OpenTSDB,
_ => return Err(format!("unknown protocol:{s}")),
};
@@ -312,6 +314,10 @@ impl<T: Send + Sync> Database for HoraeDB<T> {
let http_client = self.http_client.clone();
Self::execute_influxql(query, http_client,
context.context).await
}
+ Protocol::OpenTSDB => {
+ let http_client = self.http_client.clone();
+ Self::execute_opentsdb(query, http_client,
context.context).await
+ }
}
}
}
@@ -383,6 +389,28 @@ impl<T> HoraeDB<T> {
Box::new(query_res)
}
+ async fn execute_opentsdb(
+ query: String,
+ http_client: HttpClient,
+ _params: HashMap<String, String>,
+ ) -> Box<dyn Display> {
+ let query = query.trim().trim_end_matches(';');
+ let url = format!("http://{}/opentsdb/api/query",
http_client.endpoint);
+ let resp = http_client
+ .client
+ .post(url)
+ .header("content-type", "application/json")
+ .body(query.to_string())
+ .send()
+ .await
+ .unwrap();
+ let query_res = match resp.text().await {
+ Ok(text) => text,
+ Err(e) => format!("Failed to do influxql query, err:{e:?}"),
+ };
+ Box::new(query_res)
+ }
+
async fn execute_sql(query: String, client: Arc<dyn DbClient>) -> Box<dyn
Display> {
let query_ctx = RpcContext {
database: Some("public".to_string()),
diff --git a/src/common_types/src/tests.rs b/src/common_types/src/tests.rs
index bb59a7c8..28872068 100644
--- a/src/common_types/src/tests.rs
+++ b/src/common_types/src/tests.rs
@@ -241,6 +241,47 @@ pub fn build_schema_for_cpu() -> Schema {
builder.primary_key_indexes(vec![0, 1]).build().unwrap()
}
+/// Build a schema for testing:
+/// (tsid(uint64), key2(timestamp), tag1(string), tag2(string), value(double),
+pub fn build_schema_for_metric() -> Schema {
+ let builder = schema::Builder::new()
+ .auto_increment_column_id(true)
+ .add_key_column(
+ column_schema::Builder::new(TSID_COLUMN.to_string(),
DatumKind::UInt64)
+ .build()
+ .unwrap(),
+ )
+ .unwrap()
+ .add_key_column(
+ column_schema::Builder::new("timestamp".to_string(),
DatumKind::Timestamp)
+ .build()
+ .unwrap(),
+ )
+ .unwrap()
+ .add_normal_column(
+ column_schema::Builder::new("tag1".to_string(), DatumKind::String)
+ .is_tag(true)
+ .build()
+ .unwrap(),
+ )
+ .unwrap()
+ .add_normal_column(
+ column_schema::Builder::new("tag2".to_string(), DatumKind::String)
+ .is_tag(true)
+ .build()
+ .unwrap(),
+ )
+ .unwrap()
+ .add_normal_column(
+ column_schema::Builder::new("value".to_string(), DatumKind::Double)
+ .build()
+ .unwrap(),
+ )
+ .unwrap();
+
+ builder.primary_key_indexes(vec![0, 1]).build().unwrap()
+}
+
#[allow(clippy::too_many_arguments)]
pub fn build_row_for_dictionary(
key1: &[u8],
diff --git a/src/proxy/src/opentsdb/mod.rs b/src/proxy/src/opentsdb/mod.rs
index d9f69810..aae4a4a2 100644
--- a/src/proxy/src/opentsdb/mod.rs
+++ b/src/proxy/src/opentsdb/mod.rs
@@ -15,20 +15,32 @@
// specific language governing permissions and limitations
// under the License.
-//! This module implements [put][1] for OpenTSDB
+//! This module implements [put][1], [query][2] for OpenTSDB
//! [1]: http://opentsdb.net/docs/build/html/api_http/put.html
+//! [2]: http://opentsdb.net/docs/build/html/api_http/query/index.html
+use std::time::Instant;
+
+use futures::{stream::FuturesOrdered, StreamExt};
+use generic_error::BoxError;
use horaedbproto::storage::{
RequestContext as GrpcRequestContext, WriteRequest as GrpcWriteRequest,
};
use http::StatusCode;
-use logger::debug;
+use logger::{debug, info};
+use query_frontend::{
+ frontend::{Context as SqlContext, Frontend},
+ opentsdb::types::QueryRequest,
+ provider::CatalogMetaProvider,
+};
+use snafu::ResultExt;
+use self::types::QueryResponse;
use crate::{
context::RequestContext,
- error::{ErrNoCause, Result},
+ error::{ErrNoCause, ErrWithCause, Result},
metrics::HTTP_HANDLER_COUNTER_VEC,
- opentsdb::types::{convert_put_request, PutRequest, PutResponse},
+ opentsdb::types::{convert_output_to_response, convert_put_request,
PutRequest, PutResponse},
Context, Proxy,
};
@@ -41,7 +53,6 @@ impl Proxy {
req: PutRequest,
) -> Result<PutResponse> {
let write_table_requests = convert_put_request(req)?;
-
let num_rows: usize = write_table_requests
.iter()
.map(|req| {
@@ -93,4 +104,80 @@ impl Proxy {
}
}
}
+
+ pub async fn handle_opentsdb_query(
+ &self,
+ ctx: RequestContext,
+ req: QueryRequest,
+ ) -> Result<Vec<QueryResponse>> {
+ let request_id = ctx.request_id;
+ let begin_instant = Instant::now();
+ let deadline = ctx.timeout.map(|t| begin_instant + t);
+
+ info!(
+ "Opentsdb query handler try to process request, request_id:{},
request:{:?}",
+ request_id, req
+ );
+
+ let provider = CatalogMetaProvider {
+ manager: self.instance.catalog_manager.clone(),
+ default_catalog: &ctx.catalog,
+ default_schema: &ctx.schema,
+ function_registry: &*self.instance.function_registry,
+ };
+ let frontend = Frontend::new(provider,
self.instance.dyn_config.fronted.clone());
+ let sql_ctx = SqlContext::new(request_id.clone(), deadline);
+
+ let opentsdb_plan = frontend
+ .opentsdb_query_to_plan(&sql_ctx, req)
+ .box_err()
+ .with_context(|| ErrWithCause {
+ code: StatusCode::BAD_REQUEST,
+ msg: "Failed to build plan",
+ })?;
+
+ for plan in &opentsdb_plan.plans {
+ self.instance
+ .limiter
+ .try_limit(&plan.plan)
+ .box_err()
+ .context(ErrWithCause {
+ code: StatusCode::INTERNAL_SERVER_ERROR,
+ msg: "Query is blocked",
+ })?;
+ }
+
+ let mut futures = FuturesOrdered::new();
+ for plan in opentsdb_plan.plans {
+ let request_id_clone = request_id.clone();
+ let one_resp = async {
+ let output = self
+ .execute_plan(
+ request_id_clone,
+ &ctx.catalog,
+ &ctx.schema,
+ plan.plan,
+ deadline,
+ )
+ .await?;
+
+ convert_output_to_response(
+ output,
+ plan.metric,
+ plan.field_col_name,
+ plan.timestamp_col_name,
+ plan.tags,
+ plan.aggregated_tags,
+ )
+ };
+
+ futures.push_back(one_resp);
+ }
+
+ let resp = futures.collect::<Vec<_>>().await;
+ let resp = resp.into_iter().collect::<Result<Vec<_>>>()?;
+ let resp = resp.into_iter().flatten().collect();
+
+ Ok(resp)
+ }
}
diff --git a/src/proxy/src/opentsdb/types.rs b/src/proxy/src/opentsdb/types.rs
index 87f18833..bfeca1bd 100644
--- a/src/proxy/src/opentsdb/types.rs
+++ b/src/proxy/src/opentsdb/types.rs
@@ -16,24 +16,26 @@
// under the License.
use std::{
- collections::{HashMap, HashSet},
+ collections::{BTreeMap, HashMap, HashSet},
+ default::Default,
fmt::Debug,
};
use bytes::Bytes;
+use common_types::{datum::DatumKind, record_batch::RecordBatch,
schema::RecordSchema};
use generic_error::BoxError;
use horaedbproto::storage::{
value, Field, FieldGroup, Tag, Value as ProtoValue, WriteSeriesEntry,
WriteTableRequest,
};
use http::StatusCode;
-use serde::Deserialize;
+use interpreters::interpreter::Output;
+use query_frontend::opentsdb::DEFAULT_FIELD;
+use serde::{Deserialize, Serialize};
use serde_json::from_slice;
-use snafu::{OptionExt, ResultExt};
+use snafu::{ensure, OptionExt, ResultExt};
use time_ext::try_to_millis;
-use crate::error::{ErrNoCause, ErrWithCause, Result};
-
-const OPENTSDB_DEFAULT_FIELD: &str = "value";
+use crate::error::{ErrNoCause, ErrWithCause, InternalNoCause, Result};
#[derive(Debug)]
pub struct PutRequest {
@@ -142,7 +144,7 @@ pub(crate) fn convert_put_request(req: PutRequest) ->
Result<Vec<WriteTableReque
let mut req = WriteTableRequest {
table: metric,
tag_names,
- field_names: vec![String::from(OPENTSDB_DEFAULT_FIELD)],
+ field_names: vec![String::from(DEFAULT_FIELD)],
entries: Vec::with_capacity(points.len()),
};
@@ -213,3 +215,359 @@ pub(crate) fn validate(points: &[Point]) -> Result<()> {
Ok(())
}
+
+// http://opentsdb.net/docs/build/html/api_http/query/index.html#response
+#[derive(Serialize, Deserialize, Debug, Default, PartialEq)]
+pub struct QueryResponse {
+ pub(crate) metric: String,
+ /// A list of tags only returned when the results are for a single time
+ /// series. If results are aggregated, this value may be null or an empty
+ /// map
+ pub(crate) tags: BTreeMap<String, String>,
+ /// If more than one timeseries were included in the result set, i.e. they
+ /// were aggregated, this will display a list of tag names that were found
+ /// in common across all time series.
+ #[serde(rename = "aggregatedTags")]
+ pub(crate) aggregated_tags: Vec<String>,
+ pub(crate) dps: BTreeMap<String, f64>,
+}
+
+#[derive(Hash, Debug, Clone, Eq, PartialEq, PartialOrd, Ord)]
+struct GroupKey(Vec<String>);
+
+#[derive(Default)]
+struct QueryConverter {
+ metric: String,
+ timestamp_idx: usize,
+ value_idx: usize,
+ // (column_name, index)
+ tags_idx: Vec<(String, usize)>,
+ aggregated_tags: Vec<String>,
+ // (time_series, (tagk, tagv))
+ tags: BTreeMap<GroupKey, BTreeMap<String, String>>,
+ // (time_series, (timestamp, value))
+ values: BTreeMap<GroupKey, BTreeMap<String, f64>>,
+
+ resp: Vec<QueryResponse>,
+}
+
+impl QueryConverter {
+ fn try_new(
+ schema: &RecordSchema,
+ metric: String,
+ timestamp_col_name: String,
+ field_col_name: String,
+ tags: Vec<String>,
+ aggregated_tags: Vec<String>,
+ ) -> Result<Self> {
+ let timestamp_idx = schema
+ .index_of(×tamp_col_name)
+ .context(InternalNoCause {
+ msg: "Timestamp column is missing in query response",
+ })?;
+
+ let value_idx =
schema.index_of(&field_col_name).context(InternalNoCause {
+ msg: "Value column is missing in query response",
+ })?;
+
+ let tags_idx = tags
+ .iter()
+ .map(|tag| {
+ let idx = schema.index_of(tag).context(InternalNoCause {
+ msg: format!("Tag column is missing in query response,
tag:{}", tag),
+ })?;
+ ensure!(
+ matches!(schema.column(idx).data_type, DatumKind::String),
+ InternalNoCause {
+ msg: format!(
+ "Tag must be string type, current:{}",
+ schema.column(idx).data_type
+ )
+ }
+ );
+ Ok((tag.to_string(), idx))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ ensure!(
+ schema.column(timestamp_idx).data_type.is_timestamp(),
+ InternalNoCause {
+ msg: format!(
+ "Timestamp wrong type, current:{}",
+ schema.column(timestamp_idx).data_type
+ )
+ }
+ );
+
+ ensure!(
+ schema.column(value_idx).data_type.is_f64_castable(),
+ InternalNoCause {
+ msg: format!(
+ "Value must be f64 compatible type, current:{}",
+ schema.column(value_idx).data_type
+ )
+ }
+ );
+
+ Ok(QueryConverter {
+ metric,
+ timestamp_idx,
+ value_idx,
+ tags_idx,
+ aggregated_tags,
+ ..Default::default()
+ })
+ }
+
+ fn add_batch(&mut self, record_batch: RecordBatch) -> Result<()> {
+ let row_num = record_batch.num_rows();
+ for row_idx in 0..row_num {
+ let mut tags = BTreeMap::new();
+ // tags_key is used to identify a time series
+ let mut group_keys = Vec::with_capacity(self.tags_idx.len());
+ for (tag_key, idx) in &self.tags_idx {
+ let tag_value = record_batch
+ .column(*idx)
+ .datum(row_idx)
+ .as_str()
+ .context(InternalNoCause {
+ msg: "Tag must be String compatible type".to_string(),
+ })?
+ .to_string();
+ group_keys.push(tag_value.clone());
+ tags.insert(tag_key.clone(), tag_value);
+ }
+ let group_keys = GroupKey(group_keys);
+
+ let timestamp = record_batch
+ .column(self.timestamp_idx)
+ .datum(row_idx)
+ .as_timestamp()
+ .context(InternalNoCause {
+ msg: "Timestamp wrong type".to_string(),
+ })?
+ .as_i64()
+ .to_string();
+
+ let value = record_batch
+ .column(self.value_idx)
+ .datum(row_idx)
+ .as_f64()
+ .context(InternalNoCause {
+ msg: "Value must be f64 compatible type".to_string(),
+ })?;
+
+ if let Some(values) = self.values.get_mut(&group_keys) {
+ values.insert(timestamp, value);
+ } else {
+ self.tags.insert(group_keys.clone(), tags);
+ self.values
+ .entry(group_keys)
+ .or_default()
+ .insert(timestamp, value);
+ }
+ }
+ Ok(())
+ }
+
+ fn finish(mut self) -> Vec<QueryResponse> {
+ for (key, tags) in self.tags {
+ let dps = self.values.remove(&key).unwrap();
+ self.resp.push(QueryResponse {
+ metric: self.metric.clone(),
+ tags,
+ aggregated_tags: self.aggregated_tags.clone(),
+ dps,
+ });
+ }
+
+ self.resp
+ }
+}
+
+pub(crate) fn convert_output_to_response(
+ output: Output,
+ metric: String,
+ field_col_name: String,
+ timestamp_col_name: String,
+ tags: Vec<String>,
+ aggregated_tags: Vec<String>,
+) -> Result<Vec<QueryResponse>> {
+ let records = match output {
+ Output::Records(records) => records,
+ Output::AffectedRows(_) => {
+ return InternalNoCause {
+ msg: "output in opentsdb query should not be affected rows",
+ }
+ .fail()
+ }
+ };
+
+ let mut converter = match records.first() {
+ None => {
+ return Ok(Vec::new());
+ }
+ Some(batch) => {
+ let record_schema = batch.schema();
+ QueryConverter::try_new(
+ record_schema,
+ metric,
+ timestamp_col_name,
+ field_col_name,
+ tags,
+ aggregated_tags,
+ )?
+ }
+ };
+
+ for record in records {
+ converter.add_batch(record)?;
+ }
+
+ Ok(converter.finish())
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use arrow::{
+ array::{ArrayRef, Float64Array, StringArray,
TimestampMillisecondArray, UInt64Array},
+ record_batch::RecordBatch as ArrowRecordBatch,
+ };
+ use common_types::{
+ column_schema,
+ datum::DatumKind,
+ record_batch::RecordBatch,
+ schema::{self, Schema, TIMESTAMP_COLUMN, TSID_COLUMN},
+ };
+ use interpreters::RecordBatchVec;
+ use query_frontend::opentsdb::DEFAULT_FIELD;
+
+ use super::*;
+
+ fn build_schema() -> Schema {
+ schema::Builder::new()
+ .auto_increment_column_id(true)
+ .primary_key_indexes(vec![0, 1])
+ .add_key_column(
+ column_schema::Builder::new(TSID_COLUMN.to_string(),
DatumKind::UInt64)
+ .build()
+ .unwrap(),
+ )
+ .unwrap()
+ .add_key_column(
+ column_schema::Builder::new(TIMESTAMP_COLUMN.to_string(),
DatumKind::Timestamp)
+ .build()
+ .unwrap(),
+ )
+ .unwrap()
+ .add_normal_column(
+ column_schema::Builder::new(DEFAULT_FIELD.to_string(),
DatumKind::Double)
+ .build()
+ .unwrap(),
+ )
+ .unwrap()
+ .add_normal_column(
+ column_schema::Builder::new("tag1".to_string(),
DatumKind::String)
+ .is_tag(true)
+ .build()
+ .unwrap(),
+ )
+ .unwrap()
+ .add_normal_column(
+ column_schema::Builder::new("tag2".to_string(),
DatumKind::String)
+ .is_tag(true)
+ .build()
+ .unwrap(),
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ }
+
+ fn build_record_batch(schema: &Schema) -> RecordBatchVec {
+ let tsid: ArrayRef = Arc::new(UInt64Array::from(vec![1, 1, 2, 3, 3]));
+ let timestamp: ArrayRef =
Arc::new(TimestampMillisecondArray::from(vec![
+ 11111111, 11111112, 11111113, 11111111, 11111112,
+ ]));
+ let values: ArrayRef =
+ Arc::new(Float64Array::from(vec![100.0, 101.0, 200.0, 300.0,
301.0]));
+ let tag1: ArrayRef = Arc::new(StringArray::from(vec!["a", "a", "b",
"c", "c"]));
+ let tag2: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "y",
"z", "z"]));
+
+ let batch = ArrowRecordBatch::try_new(
+ schema.to_arrow_schema_ref(),
+ vec![tsid, timestamp, values, tag1, tag2],
+ )
+ .unwrap();
+
+ vec![RecordBatch::try_from(batch).unwrap()]
+ }
+
+ #[test]
+ fn test_convert_output_to_response() {
+ let metric = "metric".to_string();
+ let tags = vec!["tag1".to_string(), "tag2".to_string()];
+ let schema = build_schema();
+ let record_batch = build_record_batch(&schema);
+ let result = convert_output_to_response(
+ Output::Records(record_batch),
+ metric.clone(),
+ DEFAULT_FIELD.to_string(),
+ TIMESTAMP_COLUMN.to_string(),
+ tags,
+ vec![],
+ )
+ .unwrap();
+
+ assert_eq!(
+ vec![
+ QueryResponse {
+ metric: metric.clone(),
+ tags: vec![
+ ("tag1".to_string(), "a".to_string()),
+ ("tag2".to_string(), "x".to_string()),
+ ]
+ .into_iter()
+ .collect(),
+ aggregated_tags: vec![],
+ dps: vec![
+ ("11111111".to_string(), 100.0),
+ ("11111112".to_string(), 101.0),
+ ]
+ .into_iter()
+ .collect(),
+ },
+ QueryResponse {
+ metric: metric.clone(),
+ tags: vec![
+ ("tag1".to_string(), "b".to_string()),
+ ("tag2".to_string(), "y".to_string()),
+ ]
+ .into_iter()
+ .collect(),
+ aggregated_tags: vec![],
+ dps: vec![("11111113".to_string(),
200.0),].into_iter().collect(),
+ },
+ QueryResponse {
+ metric: metric.clone(),
+ tags: vec![
+ ("tag1".to_string(), "c".to_string()),
+ ("tag2".to_string(), "z".to_string()),
+ ]
+ .into_iter()
+ .collect(),
+ aggregated_tags: vec![],
+ dps: vec![
+ ("11111111".to_string(), 300.0),
+ ("11111112".to_string(), 301.0),
+ ]
+ .into_iter()
+ .collect(),
+ },
+ ],
+ result
+ );
+ }
+}
diff --git a/src/query_frontend/Cargo.toml b/src/query_frontend/Cargo.toml
index 34751e61..7956b63e 100644
--- a/src/query_frontend/Cargo.toml
+++ b/src/query_frontend/Cargo.toml
@@ -62,6 +62,8 @@ prom-remote-api = { workspace = true }
regex = { workspace = true }
regex-syntax = "0.6.28"
runtime = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
snafu = { workspace = true }
sqlparser = { workspace = true }
table_engine = { workspace = true }
diff --git a/src/query_frontend/src/lib.rs
b/src/query_frontend/src/datafusion_util.rs
similarity index 55%
copy from src/query_frontend/src/lib.rs
copy to src/query_frontend/src/datafusion_util.rs
index 9ca16a6e..561c21c6 100644
--- a/src/query_frontend/src/lib.rs
+++ b/src/query_frontend/src/datafusion_util.rs
@@ -15,23 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-#![feature(once_cell_try)]
+use common_types::{schema::TSID_COLUMN, time::TimeRange};
+use datafusion::{
+ logical_expr::Between,
+ prelude::{col, lit, Expr},
+};
-//! SQL frontend
-//!
-//! Parse sql into logical plan that can be handled by interpreters
+pub fn timerange_to_expr(query_range: &TimeRange, column_name: &str) -> Expr {
+ Expr::Between(Between {
+ expr: Box::new(col(column_name)),
+ negated: false,
+ low: Box::new(lit(query_range.inclusive_start().as_i64())),
+ high: Box::new(lit(query_range.exclusive_end().as_i64() - 1)),
+ })
+}
-pub mod ast;
-pub mod config;
-pub mod container;
-pub mod frontend;
-pub mod influxql;
-mod logical_optimizer;
-pub mod parser;
-mod partition;
-pub mod plan;
-pub mod planner;
-pub mod promql;
-pub mod provider;
-#[cfg(any(test, feature = "test"))]
-pub mod tests;
+pub fn default_sort_exprs(timestamp_column: &str) -> Vec<Expr> {
+ vec![
+ col(TSID_COLUMN).sort(true, true),
+ col(timestamp_column).sort(true, true),
+ ]
+}
diff --git a/src/query_frontend/src/frontend.rs
b/src/query_frontend/src/frontend.rs
index 5743b234..0c1daaf0 100644
--- a/src/query_frontend/src/frontend.rs
+++ b/src/query_frontend/src/frontend.rs
@@ -33,6 +33,7 @@ use table_engine::table;
use crate::{
ast::{Statement, TableName},
config::DynamicConfig,
+ opentsdb::types::{OpentsdbQueryPlan, QueryRequest},
parser::Parser,
plan::Plan,
planner::Planner,
@@ -196,6 +197,20 @@ impl<P: MetaProvider> Frontend<P> {
planner.influxql_stmt_to_plan(stmt).context(CreatePlan)
}
+ pub fn opentsdb_query_to_plan(
+ &self,
+ ctx: &Context,
+ query: QueryRequest,
+ ) -> Result<OpentsdbQueryPlan> {
+ let planner = Planner::new(
+ &self.provider,
+ ctx.request_id.clone(),
+ ctx.read_parallelism,
+ self.dyn_config.as_ref(),
+ );
+ planner.opentsdb_query_to_plan(query).context(CreatePlan)
+ }
+
pub fn write_req_to_plan(
&self,
ctx: &Context,
diff --git a/src/query_frontend/src/lib.rs b/src/query_frontend/src/lib.rs
index 9ca16a6e..ea8a9ac8 100644
--- a/src/query_frontend/src/lib.rs
+++ b/src/query_frontend/src/lib.rs
@@ -24,9 +24,12 @@
pub mod ast;
pub mod config;
pub mod container;
+mod datafusion_util;
pub mod frontend;
pub mod influxql;
+
mod logical_optimizer;
+pub mod opentsdb;
pub mod parser;
mod partition;
pub mod plan;
diff --git a/src/query_frontend/src/opentsdb/mod.rs
b/src/query_frontend/src/opentsdb/mod.rs
new file mode 100644
index 00000000..6a297141
--- /dev/null
+++ b/src/query_frontend/src/opentsdb/mod.rs
@@ -0,0 +1,417 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::HashMap, sync::Arc};
+
+use common_types::{
+ schema::Schema,
+ time::{TimeRange, Timestamp},
+};
+use datafusion::{
+ error::DataFusionError,
+ logical_expr::LogicalPlanBuilder,
+ optimizer::utils::conjunction,
+ prelude::{avg, count, ident, lit, max, min, stddev, sum, Expr},
+ sql::{planner::ContextProvider, TableReference},
+};
+use macros::define_result;
+use snafu::{OptionExt, ResultExt, Snafu};
+
+use self::types::{Filter, OpentsdbQueryPlan, OpentsdbSubPlan, QueryRequest,
SubQuery};
+use crate::{
+ config::DynamicConfig,
+ datafusion_util::{default_sort_exprs, timerange_to_expr},
+ logical_optimizer::optimize_plan,
+ plan::{Plan, QueryPlan},
+ provider::{ContextProviderAdapter, MetaProvider},
+};
+
+pub mod types;
+
+pub const DEFAULT_FIELD: &str = "value";
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+ #[snafu(display("Table provider not found, table:{name}, err:{source}"))]
+ TableProviderNotFound {
+ name: String,
+ source: DataFusionError,
+ },
+
+ #[snafu(display("Failed to build schema, err:{source}"))]
+ BuildTableSchema { source: common_types::schema::Error },
+
+ #[snafu(display("Query end should larger than start, start:{start},
end:{end}"))]
+ InvalidRange { start: i64, end: i64 },
+
+ #[snafu(display("Invalid filter, value:{filter_type}"))]
+ InvalidFilter { filter_type: String },
+
+ #[snafu(display("Invalid aggregator, value:{aggr}"))]
+ InvalidAggregator { aggr: String },
+
+ #[snafu(display("Failed to build plan, source:{source}"))]
+ BuildPlanError { source: DataFusionError },
+
+ #[snafu(display("MetaProvider {msg}, err:{source}"))]
+ MetaProviderError {
+ msg: String,
+ source: crate::provider::Error,
+ },
+}
+
+impl From<DataFusionError> for Error {
+ fn from(df_err: DataFusionError) -> Self {
+ Error::BuildPlanError { source: df_err }
+ }
+}
+
+define_result!(Error);
+
+fn normalize_filters(
+ tags: HashMap<String, String>,
+ filters: Vec<Filter>,
+) -> Result<(Vec<String>, Vec<Expr>)> {
+ let mut groupby_col_names = Vec::new();
+ let mut exprs = Vec::with_capacity(tags.len() + filters.len());
+ for (tagk, tagv) in tags {
+ exprs.push(ident(&tagk).eq(lit(tagv)));
+ groupby_col_names.push(tagk);
+ }
+
+ for filter in filters {
+ let col_name = ident(&filter.tagk);
+
+ if filter.group_by {
+ groupby_col_names.push(filter.tagk);
+ }
+
+ // http://opentsdb.net/docs/build/html/user_guide/query/filters.html
+ let expr = match filter.r#type.as_str() {
+ "literal_or" => {
+ let vs = filter.filter.split('|').map(lit).collect();
+ col_name.in_list(vs, false)
+ }
+ "not_literal_or" => {
+ let vs = filter.filter.split('|').map(lit).collect();
+ col_name.in_list(vs, true)
+ }
+ filter_type => return InvalidFilter { filter_type }.fail(),
+ };
+ exprs.push(expr);
+ }
+
+ Ok((groupby_col_names, exprs))
+}
+
+fn build_aggr_expr(aggr: &str) -> Result<Option<Expr>> {
+ // http://opentsdb.net/docs/build/html/user_guide/query/aggregators.html
+ let aggr = match aggr {
+ "sum" => sum(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+ "count" => count(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+ "avg" => avg(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+ "min" => min(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+ "max" => max(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+ "dev" => stddev(ident(DEFAULT_FIELD)).alias(DEFAULT_FIELD),
+ "none" => return Ok(None),
+ _ => return InvalidAggregator { aggr }.fail(),
+ };
+
+ Ok(Some(aggr))
+}
+
+pub fn subquery_to_plan<P: MetaProvider>(
+ meta_provider: ContextProviderAdapter<'_, P>,
+ query_range: &TimeRange,
+ sub_query: SubQuery,
+) -> Result<OpentsdbSubPlan> {
+ let metric = sub_query.metric;
+ let table_provider = meta_provider
+ .get_table_source(TableReference::bare(&metric))
+ .context(TableProviderNotFound { name: &metric })?;
+ let schema =
Schema::try_from(table_provider.schema()).context(BuildTableSchema)?;
+
+ let timestamp_col_name = schema.timestamp_name();
+ let mut tags = schema
+ .columns()
+ .iter()
+ .filter(|column| column.is_tag)
+ .map(|column| column.name.clone())
+ .collect::<Vec<_>>();
+ let (mut groupby_col_names, filter_exprs) = {
+ let (groupby, mut filters) = normalize_filters(sub_query.tags,
sub_query.filters)?;
+ filters.push(timerange_to_expr(query_range, timestamp_col_name));
+ let anded_filters = conjunction(filters).expect("at least one
filter(timestamp)");
+ (groupby, anded_filters)
+ };
+ let sort_exprs = default_sort_exprs(timestamp_col_name);
+
+ // TODO: support projection since there are multiple field columns.
+ let mut builder = LogicalPlanBuilder::scan(metric.clone(), table_provider,
None)?
+ .filter(filter_exprs)?
+ .sort(sort_exprs)?;
+
+ match build_aggr_expr(&sub_query.aggregator)? {
+ Some(aggr_expr) => {
+ let mut group_expr =
groupby_col_names.iter().map(ident).collect::<Vec<_>>();
+ group_expr.push(ident(timestamp_col_name));
+ builder = builder.aggregate(group_expr, [aggr_expr])?;
+ tags = groupby_col_names.clone();
+ }
+ None => groupby_col_names.clear(),
+ }
+
+ let df_plan = builder.build().context(BuildPlanError)?;
+ let df_plan = optimize_plan(&df_plan).context(BuildPlanError)?;
+
+ let tables = Arc::new(
+ meta_provider
+ .try_into_container()
+ .context(MetaProviderError {
+ msg: "Failed to find meta",
+ })?,
+ );
+
+ Ok(OpentsdbSubPlan {
+ plan: Plan::Query(QueryPlan {
+ df_plan,
+ table_name: Some(metric.clone()),
+ tables,
+ }),
+ metric,
+ timestamp_col_name: timestamp_col_name.to_string(),
+ field_col_name: DEFAULT_FIELD.to_string(),
+ tags,
+ aggregated_tags: groupby_col_names,
+ })
+}
+
+pub fn opentsdb_query_to_plan<P: MetaProvider>(
+ query: QueryRequest,
+ provider: &P,
+ read_parallelism: usize,
+ dyn_config: &DynamicConfig,
+) -> Result<OpentsdbQueryPlan> {
+ let range = TimeRange::new(Timestamp::new(query.start),
Timestamp::new(query.end + 1))
+ .context(InvalidRange {
+ start: query.start,
+ end: query.end,
+ })?;
+
+ let plans = query
+ .queries
+ .into_iter()
+ .map(|sub_query| {
+ subquery_to_plan(
+ ContextProviderAdapter::new(provider, read_parallelism,
dyn_config),
+ &range,
+ sub_query,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(OpentsdbQueryPlan { plans })
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+
+ use super::*;
+ use crate::tests::MockMetaProvider;
+
+ #[test]
+ fn test_normalize_filters() {
+ let mut tags = HashMap::new();
+ tags.insert("tag1".to_string(), "tag1_value".to_string());
+
+ let (groupby_col_names, exprs) = normalize_filters(tags.clone(),
vec![]).unwrap();
+ assert_eq!(groupby_col_names, vec!["tag1"]);
+ assert_eq!(exprs.len(), 1);
+
+ let (groupby_col_names, exprs) = normalize_filters(
+ tags.clone(),
+ vec![Filter {
+ r#type: "literal_or".to_string(),
+ tagk: "tag2".to_string(),
+ filter: "tag2_value|tag2_value2".to_string(),
+ group_by: false,
+ }],
+ )
+ .unwrap();
+ assert_eq!(groupby_col_names, vec!["tag1"]);
+ assert_eq!(exprs.len(), 2);
+
+ let (groupby_col_names, exprs) = normalize_filters(
+ tags.clone(),
+ vec![Filter {
+ r#type: "literal_or".to_string(),
+ tagk: "tag2".to_string(),
+ filter: "tag2_value|tag2_value2".to_string(),
+ group_by: true,
+ }],
+ )
+ .unwrap();
+ assert_eq!(groupby_col_names, vec!["tag1", "tag2"]);
+ assert_eq!(exprs.len(), 2);
+
+ let (groupby_col_names, exprs) = normalize_filters(
+ tags.clone(),
+ vec![Filter {
+ r#type: "not_literal_or".to_string(),
+ tagk: "tag2".to_string(),
+ filter: "tag2_value|tag2_value2".to_string(),
+ group_by: false,
+ }],
+ )
+ .unwrap();
+ assert_eq!(groupby_col_names, vec!["tag1"]);
+ assert_eq!(exprs.len(), 2);
+ }
+
+ #[test]
+ fn test_build_aggr_expr() {
+ let aggr = build_aggr_expr("sum").unwrap().unwrap();
+ assert_eq!(aggr.to_string(), "SUM(value) AS value");
+
+ let aggr = build_aggr_expr("count").unwrap().unwrap();
+ assert_eq!(aggr.to_string(), "COUNT(value) AS value");
+
+ let aggr = build_aggr_expr("avg").unwrap().unwrap();
+ assert_eq!(aggr.to_string(), "AVG(value) AS value");
+
+ let aggr = build_aggr_expr("min").unwrap().unwrap();
+ assert_eq!(aggr.to_string(), "MIN(value) AS value");
+
+ let aggr = build_aggr_expr("max").unwrap().unwrap();
+ assert_eq!(aggr.to_string(), "MAX(value) AS value");
+
+ let aggr = build_aggr_expr("dev").unwrap().unwrap();
+ assert_eq!(aggr.to_string(), "STDDEV(value) AS value");
+
+ let aggr = build_aggr_expr("none").unwrap();
+ assert!(aggr.is_none());
+
+ let err = build_aggr_expr("invalid").unwrap_err();
+ assert!(err.to_string().contains("Invalid aggregator"));
+ }
+
+ #[test]
+ fn test_subquery_to_plan() {
+ let meta_provider = MockMetaProvider::default();
+
+ let query_range = TimeRange::new(Timestamp::new(0),
Timestamp::new(100)).unwrap();
+ let sub_query = SubQuery {
+ metric: "metric".to_string(),
+ aggregator: "sum".to_string(),
+ rate: false,
+ downsample: None,
+ tags: vec![("tag1".to_string(), "tag1_value".to_string())]
+ .into_iter()
+ .collect(),
+ filters: vec![Filter {
+ r#type: "literal_or".to_string(),
+ tagk: "tag2".to_string(),
+ filter: "tag2_value|tag2_value2".to_string(),
+ group_by: true,
+ }],
+ };
+
+ let plan = subquery_to_plan(
+ ContextProviderAdapter::new(&meta_provider, 1,
&Default::default()),
+ &query_range,
+ sub_query,
+ )
+ .unwrap();
+
+ assert_eq!(plan.metric, "metric");
+ assert_eq!(plan.field_col_name, "value");
+ assert_eq!(plan.timestamp_col_name, "timestamp");
+ assert_eq!(plan.tags, vec!["tag1", "tag2"]);
+ assert_eq!(plan.aggregated_tags, vec!["tag1", "tag2"]);
+
+ let df_plan = match plan.plan {
+ Plan::Query(QueryPlan { df_plan, .. }) => df_plan,
+ _ => panic!("expect query plan"),
+ };
+ assert_eq!(df_plan.schema().fields().len(), 4);
+ }
+
+ #[test]
+ fn test_opentsdb_query_to_plan() {
+ let meta_provider = MockMetaProvider::default();
+
+ let query = QueryRequest {
+ start: 0,
+ end: 100,
+ queries: vec![
+ SubQuery {
+ metric: "metric".to_string(),
+ aggregator: "none".to_string(),
+ rate: false,
+ downsample: None,
+ tags: vec![("tag1".to_string(), "tag1_value".to_string())]
+ .into_iter()
+ .collect(),
+ filters: vec![],
+ },
+ SubQuery {
+ metric: "metric".to_string(),
+ aggregator: "sum".to_string(),
+ rate: false,
+ downsample: None,
+ tags: vec![("tag1".to_string(), "tag1_value".to_string())]
+ .into_iter()
+ .collect(),
+ filters: vec![],
+ },
+ ],
+ ms_resolution: false,
+ };
+
+ let plan =
+ opentsdb_query_to_plan(query, &meta_provider, 1,
&DynamicConfig::default()).unwrap();
+
+ assert_eq!(plan.plans.len(), 2);
+
+ let plan0 = &plan.plans[0];
+ assert_eq!(plan0.metric, "metric");
+ assert_eq!(plan0.field_col_name, "value");
+ assert_eq!(plan0.timestamp_col_name, "timestamp");
+ assert_eq!(plan0.tags, vec!["tag1", "tag2"]);
+ assert!(plan0.aggregated_tags.is_empty());
+
+ let df_plan = match &plan0.plan {
+ Plan::Query(QueryPlan { df_plan, .. }) => df_plan,
+ _ => panic!("expect query plan"),
+ };
+ assert_eq!(df_plan.schema().fields().len(), 5);
+
+ let plan1 = &plan.plans[1];
+ assert_eq!(plan1.metric, "metric");
+ assert_eq!(plan1.field_col_name, "value");
+ assert_eq!(plan1.timestamp_col_name, "timestamp");
+ assert_eq!(plan1.tags, vec!["tag1"]);
+ assert_eq!(plan1.aggregated_tags, vec!["tag1"]);
+
+ let df_plan = match &plan1.plan {
+ Plan::Query(QueryPlan { df_plan, .. }) => df_plan,
+ _ => panic!("expect query plan"),
+ };
+ assert_eq!(df_plan.schema().fields().len(), 3);
+ }
+}
diff --git a/src/query_frontend/src/opentsdb/types.rs
b/src/query_frontend/src/opentsdb/types.rs
new file mode 100644
index 00000000..791ee7c2
--- /dev/null
+++ b/src/query_frontend/src/opentsdb/types.rs
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use serde::{Deserialize, Serialize};
+
+use crate::plan::Plan;
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct Filter {
+ pub r#type: String,
+ pub tagk: String,
+ pub filter: String,
+ #[serde(rename = "groupBy")]
+ pub group_by: bool,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct SubQuery {
+ pub metric: String,
+ pub aggregator: String,
+ #[serde(default)]
+ pub rate: bool,
+ pub downsample: Option<String>,
+ #[serde(default)]
+ pub tags: HashMap<String, String>,
+ #[serde(default)]
+ pub filters: Vec<Filter>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct QueryRequest {
+ pub start: i64,
+ pub end: i64,
+ pub queries: Vec<SubQuery>,
+ #[serde(rename = "msResolution", default)]
+ pub ms_resolution: bool,
+}
+
+pub struct OpentsdbSubPlan {
+ pub plan: Plan,
+ pub metric: String,
+ pub field_col_name: String,
+ pub timestamp_col_name: String,
+ pub tags: Vec<String>,
+ pub aggregated_tags: Vec<String>,
+}
+
+pub struct OpentsdbQueryPlan {
+ pub plans: Vec<OpentsdbSubPlan>,
+}
diff --git a/src/query_frontend/src/planner.rs
b/src/query_frontend/src/planner.rs
index 9fc7ade9..b5d0fb50 100644
--- a/src/query_frontend/src/planner.rs
+++ b/src/query_frontend/src/planner.rs
@@ -72,6 +72,10 @@ use crate::{
container::TableReference,
frontend::parse_table_name_with_standard,
logical_optimizer::optimize_plan,
+ opentsdb::{
+ opentsdb_query_to_plan,
+ types::{OpentsdbQueryPlan, QueryRequest},
+ },
parser,
partition::PartitionParser,
plan::{
@@ -250,6 +254,9 @@ pub enum Error {
#[snafu(display("Failed to build plan from promql, error:{}", source))]
BuildPromPlanError { source: crate::promql::Error },
+ #[snafu(display("Failed to build opentsdb plan, error:{}", source))]
+ BuildOpentsdbPlanError { source: crate::opentsdb::Error },
+
#[snafu(display(
"Failed to cast default value expr to column type, expr:{}, from:{},
to:{}",
expr,
@@ -381,6 +388,11 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
.context(BuildInfluxqlPlan)
}
+ pub fn opentsdb_query_to_plan(&self, query: QueryRequest) ->
Result<OpentsdbQueryPlan> {
+ opentsdb_query_to_plan(query, self.provider, self.read_parallelism,
self.dyn_config)
+ .context(BuildOpentsdbPlanError)
+ }
+
pub fn write_req_to_plan(
&self,
schema_config: &SchemaConfig,
diff --git a/src/query_frontend/src/tests.rs b/src/query_frontend/src/tests.rs
index 7d180e6d..d110a416 100644
--- a/src/query_frontend/src/tests.rs
+++ b/src/query_frontend/src/tests.rs
@@ -18,7 +18,9 @@
use std::sync::Arc;
use catalog::consts::{DEFAULT_CATALOG, DEFAULT_SCHEMA};
-use common_types::tests::{build_default_value_schema, build_schema,
build_schema_for_cpu};
+use common_types::tests::{
+ build_default_value_schema, build_schema, build_schema_for_cpu,
build_schema_for_metric,
+};
use datafusion::catalog::TableReference;
use df_operator::{scalar::ScalarUdf, udaf::AggregateUdf};
use partition_table_engine::test_util::PartitionedMemoryTable;
@@ -86,6 +88,13 @@ impl Default for MockMetaProvider {
)),
// Used in `test_partitioned_table_query_to_plan`
Arc::new(test_partitioned_table),
+ // Used in `test_subquery_to_plan` and
`test_opentsdb_query_to_plan`
+ Arc::new(MemoryTable::new(
+ "metric".to_string(),
+ TableId::from(106),
+ build_schema_for_metric(),
+ ANALYTIC_ENGINE_TYPE.to_string(),
+ )),
],
}
}
diff --git a/src/server/src/http.rs b/src/server/src/http.rs
index 43a1910f..95ce7a18 100644
--- a/src/server/src/http.rs
+++ b/src/server/src/http.rs
@@ -417,7 +417,8 @@ impl Service {
warp::path!("influxdb" / "v1" / ..).and(write_api.or(query_api))
}
- // POST /opentsdb/api/put
+ /// Expose `/opentsdb/api/put` and `/opentsdb/api/query` to serve opentsdb
+ /// API
fn opentsdb_api(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> +
Clone {
@@ -455,7 +456,21 @@ impl Service {
}
});
- warp::path!("opentsdb" / "api" / ..).and(put_api)
+ let query_api = warp::path!("query")
+ .and(warp::post())
+ .and(body_limit)
+ .and(self.with_context())
+ .and(warp::body::json())
+ .and(self.with_proxy())
+ .and_then(|ctx, req, proxy: Arc<Proxy>| async move {
+ let result = proxy.handle_opentsdb_query(ctx, req).await;
+ match result {
+ Ok(res) => Ok(reply::json(&res)),
+ Err(e) => Err(reject::custom(e)),
+ }
+ });
+
+ warp::path!("opentsdb" / "api" / ..).and(put_api.or(query_api))
}
// POST /debug/flush_memtable
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]