jiacai2050 commented on code in PR #1624:
URL: https://github.com/apache/horaedb/pull/1624#discussion_r1944156028
##########
src/metric_engine/src/index/mod.rs:
##########
@@ -15,30 +15,123 @@
// specific language governing permissions and limitations
// under the License.
+mod cache;
use std::sync::Arc;
-use horaedb_storage::storage::TimeMergeStorageRef;
+use cache::CacheManager;
+use horaedb_storage::storage::StorageRuntimes;
+use object_store::local::LocalFileSystem;
+use tokio::runtime::Runtime;
-use crate::{types::Sample, Result};
+use crate::{
+ types::{hash, MetricId, Sample, SeriesId, SeriesKey},
+ Result,
+};
pub struct IndexManager {
inner: Arc<Inner>,
}
impl IndexManager {
- pub fn new(storage: TimeMergeStorageRef) -> Self {
- Self {
- inner: Arc::new(Inner { storage }),
- }
+ pub async fn try_new() -> Result<Self> {
+ // TODO: maybe initialize runtime and store by config, now just make it
+ // compilable
+ let rt = Arc::new(Runtime::new().unwrap());
+ let runtimes = StorageRuntimes::new(rt.clone(), rt);
+ let store = Arc::new(LocalFileSystem::new());
+ let root_dir = "/tmp/horaedb".to_string();
+
+ Ok(Self {
+ inner: Arc::new(Inner {
+ cache: CacheManager::try_new(runtimes, store,
root_dir.as_str()).await?,
+ }),
+ })
}
/// Populate series ids from labels.
/// It will also build inverted index for labels.
- pub async fn populate_series_ids(&self, _samples: &mut [Sample]) ->
Result<()> {
- todo!()
+ pub async fn populate_series_ids(&self, samples: &mut [Sample]) ->
Result<()> {
+ // 1.1 create metric id and series id
+ let metric_ids = samples
+ .iter()
+ .map(|s| MetricId(hash(s.name.as_slice())))
+ .collect::<Vec<_>>();
Review Comment:
We should avoid create new Vec in write path, it will hurt perf.
##########
src/metric_engine/src/index/mod.rs:
##########
@@ -15,30 +15,123 @@
// specific language governing permissions and limitations
// under the License.
+mod cache;
use std::sync::Arc;
-use horaedb_storage::storage::TimeMergeStorageRef;
+use cache::CacheManager;
+use horaedb_storage::storage::StorageRuntimes;
+use object_store::local::LocalFileSystem;
+use tokio::runtime::Runtime;
-use crate::{types::Sample, Result};
+use crate::{
+ types::{hash, MetricId, Sample, SeriesId, SeriesKey},
+ Result,
+};
pub struct IndexManager {
inner: Arc<Inner>,
}
impl IndexManager {
- pub fn new(storage: TimeMergeStorageRef) -> Self {
- Self {
- inner: Arc::new(Inner { storage }),
- }
+ pub async fn try_new() -> Result<Self> {
+ // TODO: maybe initialize runtime and store by config, now just make it
+ // compilable
+ let rt = Arc::new(Runtime::new().unwrap());
+ let runtimes = StorageRuntimes::new(rt.clone(), rt);
+ let store = Arc::new(LocalFileSystem::new());
+ let root_dir = "/tmp/horaedb".to_string();
+
+ Ok(Self {
+ inner: Arc::new(Inner {
+ cache: CacheManager::try_new(runtimes, store,
root_dir.as_str()).await?,
+ }),
+ })
}
/// Populate series ids from labels.
/// It will also build inverted index for labels.
- pub async fn populate_series_ids(&self, _samples: &mut [Sample]) ->
Result<()> {
- todo!()
+ pub async fn populate_series_ids(&self, samples: &mut [Sample]) ->
Result<()> {
+ // 1.1 create metric id and series id
+ let metric_ids = samples
+ .iter()
+ .map(|s| MetricId(hash(s.name.as_slice())))
+ .collect::<Vec<_>>();
+
+ let series_keys = samples
+ .iter()
+ .map(|s| SeriesKey::new(Some(s.name.as_slice()),
s.lables.as_slice()))
+ .collect::<Vec<_>>();
+ let series_ids = series_keys
+ .iter()
+ .map(|e| SeriesId(hash(e.make_bytes().as_slice())))
+ .collect::<Vec<_>>();
+
+ // 1.2 populate metric id and series id
+ samples.iter_mut().enumerate().for_each(|(i, sample)| {
+ sample.name_id = Some(metric_ids[i]);
+ sample.series_id = Some(series_ids[i]);
+ });
+
+ // 2.1 update cache metrics
+ futures::future::join_all(
Review Comment:
We should do this in `MetricManager` module.
##########
src/metric_engine/src/index/cache.rs:
##########
@@ -0,0 +1,1293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ collections::{HashMap, HashSet},
+ path::Path,
+ sync::Arc,
+ time::{Duration, SystemTime},
+};
+
+use anyhow::Context;
+use arrow::{
+ array::{
+ Array, ArrayRef, BinaryArray, BinaryBuilder, ListArray, UInt64Array,
UInt64Builder,
+ UInt8Array, UInt8Builder,
+ },
+ buffer::OffsetBuffer,
+ datatypes::{DataType, Field, Schema, ToByteSlice},
+ record_batch::RecordBatch,
+};
+use dashmap::DashMap;
+use futures::StreamExt;
+use horaedb_storage::{
+ config::StorageConfig,
+ storage::{
+ CloudObjectStorage, ScanRequest, StorageRuntimes, TimeMergeStorageRef,
WriteRequest,
+ },
+ types::{ObjectStoreRef, TimeRange, Timestamp},
+};
+use tokio::{
+ sync::{
+ mpsc::{self, Receiver, Sender},
+ RwLock,
+ },
+ time::timeout,
+};
+use tracing::{error, warn};
+
+use crate::types::{
+ hash, FieldName, FieldType, Label, MetricId, MetricName, Result,
SegmentDuration, SeriesId,
+ SeriesKey, TagName, TagNames, TagValue, TagValues, DEFAULT_FIELD_NAME,
DEFAULT_FIELD_TYPE,
+};
+
+const COLUMN_DURATION: &str = "duration";
+const COLUMN_METRIC_NAME: &str = "metric_name";
+const COLUMN_METRIC_ID: &str = "metric_id";
+const COLUMN_SERIES_ID: &str = "series_id";
+const COLUMN_FIELD_ID: &str = "field_id";
+const COLUMN_FIELD_NAME: &str = "field_name";
+const COLUMN_FIELD_TYPE: &str = "field_type";
+const COLUMN_TAG_NAMES: &str = "tag_names";
+const COLUMN_TAG_VALUES: &str = "tag_values";
+const COLUMN_TAG_NAME: &str = "tag_name";
+const COLUMN_TAG_VALUE: &str = "tag_value";
+const COLUMN_TAG_ITEM: &str = "item";
+
+type ConcurrentMetricMap = RwLock<HashMap<MetricName, (FieldName, FieldType)>>;
+type ConcurrentSeriesMap = RwLock<HashMap<SeriesId, SeriesKey>>;
+type ConcurrentTagKVMap =
+ RwLock<HashMap<TagName, HashMap<TagValue, HashMap<MetricId,
HashSet<SeriesId>>>>>;
+
+struct MetricsCache {
+ cache: DashMap<SegmentDuration, ConcurrentMetricMap>,
+ pub storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+struct SeriesCache {
+ cache: DashMap<SegmentDuration, ConcurrentSeriesMap>,
+ pub storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+
+#[derive(PartialEq, Eq, Hash, Debug)]
+struct SegmentSeries {
+ segment: SegmentDuration,
+ series_id: SeriesId,
+}
+
+struct TagIndexCache {
+ cache: DashMap<SegmentDuration, ConcurrentTagKVMap>,
Review Comment:
Two thoughts here:
1. We should prefer std map over dash map, those third party may introduce
unexpected bugs.
2. For current segment cache, we can use an independent field to represent,
so we can save one hashmap lookup.
Other questions we need to consider:
1. How will you evict segment?
##########
src/metric_engine/src/index/cache.rs:
##########
@@ -0,0 +1,1293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ collections::{HashMap, HashSet},
+ path::Path,
+ sync::Arc,
+ time::{Duration, SystemTime},
+};
+
+use anyhow::Context;
+use arrow::{
+ array::{
+ Array, ArrayRef, BinaryArray, BinaryBuilder, ListArray, UInt64Array,
UInt64Builder,
+ UInt8Array, UInt8Builder,
+ },
+ buffer::OffsetBuffer,
+ datatypes::{DataType, Field, Schema, ToByteSlice},
+ record_batch::RecordBatch,
+};
+use dashmap::DashMap;
+use futures::StreamExt;
+use horaedb_storage::{
+ config::StorageConfig,
+ storage::{
+ CloudObjectStorage, ScanRequest, StorageRuntimes, TimeMergeStorageRef,
WriteRequest,
+ },
+ types::{ObjectStoreRef, TimeRange, Timestamp},
+};
+use tokio::{
+ sync::{
+ mpsc::{self, Receiver, Sender},
+ RwLock,
+ },
+ time::timeout,
+};
+use tracing::{error, warn};
+
+use crate::types::{
+ hash, FieldName, FieldType, Label, MetricId, MetricName, Result,
SegmentDuration, SeriesId,
+ SeriesKey, TagName, TagNames, TagValue, TagValues, DEFAULT_FIELD_NAME,
DEFAULT_FIELD_TYPE,
+};
+
+const COLUMN_DURATION: &str = "duration";
+const COLUMN_METRIC_NAME: &str = "metric_name";
+const COLUMN_METRIC_ID: &str = "metric_id";
+const COLUMN_SERIES_ID: &str = "series_id";
+const COLUMN_FIELD_ID: &str = "field_id";
+const COLUMN_FIELD_NAME: &str = "field_name";
+const COLUMN_FIELD_TYPE: &str = "field_type";
+const COLUMN_TAG_NAMES: &str = "tag_names";
+const COLUMN_TAG_VALUES: &str = "tag_values";
+const COLUMN_TAG_NAME: &str = "tag_name";
+const COLUMN_TAG_VALUE: &str = "tag_value";
+const COLUMN_TAG_ITEM: &str = "item";
+
+type ConcurrentMetricMap = RwLock<HashMap<MetricName, (FieldName, FieldType)>>;
+type ConcurrentSeriesMap = RwLock<HashMap<SeriesId, SeriesKey>>;
+type ConcurrentTagKVMap =
+ RwLock<HashMap<TagName, HashMap<TagValue, HashMap<MetricId,
HashSet<SeriesId>>>>>;
+
+struct MetricsCache {
+ cache: DashMap<SegmentDuration, ConcurrentMetricMap>,
+ pub storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+struct SeriesCache {
+ cache: DashMap<SegmentDuration, ConcurrentSeriesMap>,
+ pub storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+
+#[derive(PartialEq, Eq, Hash, Debug)]
+struct SegmentSeries {
+ segment: SegmentDuration,
+ series_id: SeriesId,
+}
+
+struct TagIndexCache {
+ cache: DashMap<SegmentDuration, ConcurrentTagKVMap>,
+ series_records: RwLock<HashSet<SegmentSeries>>,
+ storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+
+impl MetricsCache {
+ fn new(storage: TimeMergeStorageRef, sender: Sender<Task>) -> Self {
+ Self {
+ cache: DashMap::new(),
+ storage,
+ sender,
+ }
+ }
+
+ #[allow(clippy::type_complexity)]
+ fn parse_record_batch(
+ batch: &RecordBatch,
+ index: usize,
+ ) -> Result<(&[u8], &[u8], u8, u64, u64, u64)> {
+ let metric_name = batch
+ .column_by_name(COLUMN_METRIC_NAME)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .context("parse column failed")?
+ .value(index);
+
+ let field_name = batch
+ .column_by_name(COLUMN_FIELD_NAME)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .context("parse column failed")?
+ .value(index);
+
+ let field_type = batch
+ .column_by_name(COLUMN_FIELD_TYPE)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt8Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let filed_id = batch
+ .column_by_name(COLUMN_FIELD_ID)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let metric_id = batch
+ .column_by_name(COLUMN_METRIC_ID)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let duration = batch
+ .column_by_name(COLUMN_DURATION)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ Ok((
+ metric_name,
+ field_name,
+ field_type,
+ filed_id,
+ metric_id,
+ duration,
+ ))
+ }
+
+ async fn load_from_storage(&mut self) -> Result<()> {
+ let mut result_stream = self
+ .storage
+ .scan(ScanRequest {
+ range: TimeRange::new(Timestamp(0), Timestamp::MAX),
+ predicate: vec![],
+ projections: None,
+ })
+ .await?;
+ while let Some(item) = result_stream.next().await {
+ let batch = item.context("get next batch failed")?;
+ for index in 0..batch.num_rows() {
+ let (metric_name, field_name, field_type, _, _, duration) =
+ MetricsCache::parse_record_batch(&batch, index)?;
+ self.update(
+ SegmentDuration::date(Duration::from_millis(duration)),
+ metric_name,
+ field_name,
+ field_type,
+ )
+ .await?;
+ }
+ }
+ Ok(())
+ }
+
+ fn schema() -> Arc<Schema> {
Review Comment:
Cache module has no schema, this field belong to IndexManager.
##########
src/metric_engine/src/index/cache.rs:
##########
@@ -0,0 +1,1293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ collections::{HashMap, HashSet},
+ path::Path,
+ sync::Arc,
+ time::{Duration, SystemTime},
+};
+
+use anyhow::Context;
+use arrow::{
+ array::{
+ Array, ArrayRef, BinaryArray, BinaryBuilder, ListArray, UInt64Array,
UInt64Builder,
+ UInt8Array, UInt8Builder,
+ },
+ buffer::OffsetBuffer,
+ datatypes::{DataType, Field, Schema, ToByteSlice},
+ record_batch::RecordBatch,
+};
+use dashmap::DashMap;
+use futures::StreamExt;
+use horaedb_storage::{
+ config::StorageConfig,
+ storage::{
+ CloudObjectStorage, ScanRequest, StorageRuntimes, TimeMergeStorageRef,
WriteRequest,
+ },
+ types::{ObjectStoreRef, TimeRange, Timestamp},
+};
+use tokio::{
+ sync::{
+ mpsc::{self, Receiver, Sender},
+ RwLock,
+ },
+ time::timeout,
+};
+use tracing::{error, warn};
+
+use crate::types::{
+ hash, FieldName, FieldType, Label, MetricId, MetricName, Result,
SegmentDuration, SeriesId,
+ SeriesKey, TagName, TagNames, TagValue, TagValues, DEFAULT_FIELD_NAME,
DEFAULT_FIELD_TYPE,
+};
+
+const COLUMN_DURATION: &str = "duration";
+const COLUMN_METRIC_NAME: &str = "metric_name";
+const COLUMN_METRIC_ID: &str = "metric_id";
+const COLUMN_SERIES_ID: &str = "series_id";
+const COLUMN_FIELD_ID: &str = "field_id";
+const COLUMN_FIELD_NAME: &str = "field_name";
+const COLUMN_FIELD_TYPE: &str = "field_type";
+const COLUMN_TAG_NAMES: &str = "tag_names";
+const COLUMN_TAG_VALUES: &str = "tag_values";
+const COLUMN_TAG_NAME: &str = "tag_name";
+const COLUMN_TAG_VALUE: &str = "tag_value";
+const COLUMN_TAG_ITEM: &str = "item";
+
+type ConcurrentMetricMap = RwLock<HashMap<MetricName, (FieldName, FieldType)>>;
+type ConcurrentSeriesMap = RwLock<HashMap<SeriesId, SeriesKey>>;
+type ConcurrentTagKVMap =
+ RwLock<HashMap<TagName, HashMap<TagValue, HashMap<MetricId,
HashSet<SeriesId>>>>>;
+
+struct MetricsCache {
+ cache: DashMap<SegmentDuration, ConcurrentMetricMap>,
+ pub storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+struct SeriesCache {
+ cache: DashMap<SegmentDuration, ConcurrentSeriesMap>,
+ pub storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+
+#[derive(PartialEq, Eq, Hash, Debug)]
+struct SegmentSeries {
+ segment: SegmentDuration,
+ series_id: SeriesId,
+}
+
+struct TagIndexCache {
+ cache: DashMap<SegmentDuration, ConcurrentTagKVMap>,
+ series_records: RwLock<HashSet<SegmentSeries>>,
+ storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+
+impl MetricsCache {
+ fn new(storage: TimeMergeStorageRef, sender: Sender<Task>) -> Self {
+ Self {
+ cache: DashMap::new(),
+ storage,
+ sender,
+ }
+ }
+
+ #[allow(clippy::type_complexity)]
+ fn parse_record_batch(
+ batch: &RecordBatch,
+ index: usize,
+ ) -> Result<(&[u8], &[u8], u8, u64, u64, u64)> {
+ let metric_name = batch
+ .column_by_name(COLUMN_METRIC_NAME)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .context("parse column failed")?
+ .value(index);
+
+ let field_name = batch
+ .column_by_name(COLUMN_FIELD_NAME)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .context("parse column failed")?
+ .value(index);
+
+ let field_type = batch
+ .column_by_name(COLUMN_FIELD_TYPE)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt8Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let filed_id = batch
+ .column_by_name(COLUMN_FIELD_ID)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let metric_id = batch
+ .column_by_name(COLUMN_METRIC_ID)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let duration = batch
+ .column_by_name(COLUMN_DURATION)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ Ok((
+ metric_name,
+ field_name,
+ field_type,
+ filed_id,
+ metric_id,
+ duration,
+ ))
+ }
+
+ async fn load_from_storage(&mut self) -> Result<()> {
+ let mut result_stream = self
+ .storage
+ .scan(ScanRequest {
+ range: TimeRange::new(Timestamp(0), Timestamp::MAX),
+ predicate: vec![],
+ projections: None,
+ })
+ .await?;
+ while let Some(item) = result_stream.next().await {
+ let batch = item.context("get next batch failed")?;
+ for index in 0..batch.num_rows() {
+ let (metric_name, field_name, field_type, _, _, duration) =
+ MetricsCache::parse_record_batch(&batch, index)?;
+ self.update(
+ SegmentDuration::date(Duration::from_millis(duration)),
+ metric_name,
+ field_name,
+ field_type,
+ )
+ .await?;
+ }
+ }
+ Ok(())
+ }
+
+ fn schema() -> Arc<Schema> {
+ Arc::new(Schema::new(vec![
+ Field::new(COLUMN_METRIC_NAME, DataType::Binary, true),
+ Field::new(COLUMN_METRIC_ID, DataType::UInt64, true),
+ Field::new(COLUMN_FIELD_NAME, DataType::Binary, true),
+ Field::new(COLUMN_FIELD_ID, DataType::UInt64, true),
+ Field::new(COLUMN_FIELD_TYPE, DataType::UInt8, true),
+ Field::new(COLUMN_DURATION, DataType::UInt64, true),
+ ]))
+ }
+
+ async fn update(
+ &self,
+ date: SegmentDuration,
+ name: &[u8],
+ field_name: &[u8],
+ field_type: u8,
+ ) -> Result<bool> {
+ if self.cache.contains_key(&date)
+ && self
+ .cache
+ .get(&date)
+ .context("get key failed")?
+ .read()
+ .await
+ .contains_key(name)
+ {
+ Ok(false)
+ } else {
+ let result = self
+ .cache
+ .entry(date)
+ .or_default()
+ .write()
+ .await
+ .insert(name.to_vec(), (field_name.to_vec(), field_type));
+
+ Ok(result.is_none())
+ }
+ }
+
+ async fn notify_write(
+ &self,
+ current: Duration,
+ name: &[u8],
+ field_name: &[u8],
+ field_type: u8,
+ ) -> Result<()> {
+ self.sender
+ .send(Task::Metric(
+ current,
+ name.to_vec(),
+ field_name.to_vec(),
+ field_type,
+ ))
+ .await
+ .context("notify write failed.")?;
+ Ok(())
+ }
+}
+
+impl SeriesCache {
+ fn new(storage: TimeMergeStorageRef, sender: Sender<Task>) -> Self {
+ Self {
+ cache: DashMap::new(),
+ storage,
+ sender,
+ }
+ }
+
+ async fn parse_record_batch(
+ batch: &RecordBatch,
+ index: usize,
+ ) -> Result<(u64, Vec<Vec<u8>>, Vec<Vec<u8>>, u64)> {
+ let series_id = batch
+ .column_by_name(COLUMN_SERIES_ID)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let tag_names = {
+ let tag_name_array = batch
+ .column_by_name(COLUMN_TAG_NAMES)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .context("parse column failed")?
+ .value(index);
+ let tag_names = tag_name_array
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .context("parse column failed")?;
+ tag_names
+ .iter()
+ .map(|item| item.unwrap_or(b"").to_vec())
+ .collect::<Vec<_>>()
+ };
+
+ let tag_values = {
+ let tag_value_array = batch
+ .column_by_name(COLUMN_TAG_VALUES)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .context("parse column failed")?
+ .value(index);
+ let tag_values = tag_value_array
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .expect("List elements should be BinaryArray");
+ tag_values
+ .iter()
+ .map(|item| item.unwrap_or(b"").to_vec())
+ .collect::<Vec<_>>()
+ };
+
+ let duration = batch
+ .column_by_name(COLUMN_DURATION)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+ Ok((series_id, tag_names, tag_values, duration))
+ }
+
+ async fn load_from_storage(&mut self) -> Result<()> {
+ let mut result_stream = self
+ .storage
+ .scan(ScanRequest {
+ range: TimeRange::new(Timestamp(0), Timestamp::MAX),
+ predicate: vec![],
+ projections: None,
+ })
+ .await?;
+ while let Some(item) = result_stream.next().await {
+ let batch = item.context("get next batch failed.")?;
+ for index in 0..batch.num_rows() {
+ let (series_id, tag_names, tag_values, duration) =
+ SeriesCache::parse_record_batch(&batch, index).await?;
+ let labels = tag_names
+ .into_iter()
+ .zip(tag_values.into_iter())
+ .map(|(name, value)| Label { name, value })
+ .collect::<Vec<_>>();
+ let key = SeriesKey::new(None, labels.as_slice());
+ self.update(
+ SegmentDuration::date(Duration::from_millis(duration)),
+ &SeriesId(series_id),
+ &key,
+ )
+ .await?;
+ }
+ }
+ Ok(())
+ }
+
+ pub fn schema() -> Arc<Schema> {
+ Arc::new(Schema::new(vec![
+ Field::new(COLUMN_METRIC_ID, DataType::UInt64, true),
+ Field::new(COLUMN_SERIES_ID, DataType::UInt64, true),
+ Field::new(
+ COLUMN_TAG_NAMES,
+ DataType::List(Arc::new(Field::new(
+ COLUMN_TAG_ITEM,
+ DataType::Binary,
+ true,
+ ))),
+ true,
+ ),
+ Field::new(
+ COLUMN_TAG_VALUES,
+ DataType::List(Arc::new(Field::new(
+ COLUMN_TAG_ITEM,
+ DataType::Binary,
+ true,
+ ))),
+ true,
+ ),
+ Field::new(COLUMN_DURATION, DataType::UInt64, true),
+ ]))
+ }
+
+ async fn update(&self, date: SegmentDuration, id: &SeriesId, key:
&SeriesKey) -> Result<bool> {
+ if self.cache.contains_key(&date)
+ && self
+ .cache
+ .get(&date)
+ .context("get key failed")?
+ .read()
+ .await
+ .contains_key(id)
+ {
+ Ok(false)
+ } else {
+ let result = self
+ .cache
+ .entry(date)
+ .or_default()
+ .write()
+ .await
+ .insert(*id, key.clone());
+
+ Ok(result.is_none())
+ }
+ }
+
+ async fn notify_write(
+ &self,
+ current: Duration,
+ id: &SeriesId,
+ key: &SeriesKey,
+ metric_id: &MetricId,
+ ) -> Result<()> {
+ self.sender
+ .send(Task::Series(current, *id, key.clone(), *metric_id))
+ .await
+ .context("notify write failed.")?;
+ Ok(())
+ }
+}
+
+impl TagIndexCache {
+ fn new(storage: TimeMergeStorageRef, sender: Sender<Task>) -> Self {
+ Self {
+ cache: DashMap::new(),
+ series_records: RwLock::new(HashSet::new()),
+ storage,
+ sender,
+ }
+ }
+
+ async fn parse_record_batch(
+ batch: &RecordBatch,
+ index: usize,
+ ) -> Result<(u64, &[u8], &[u8], u64, u64)> {
+ let metric_id = batch
+ .column_by_name(COLUMN_METRIC_ID)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let tag_name = batch
+ .column_by_name(COLUMN_TAG_NAME)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .context("parse column failed")?
+ .value(index);
+
+ let tag_value = batch
+ .column_by_name(COLUMN_TAG_VALUE)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .context("parse column failed")?
+ .value(index);
+
+ let series_id = batch
+ .column_by_name(COLUMN_SERIES_ID)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let duration = batch
+ .column_by_name(COLUMN_DURATION)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ Ok((metric_id, tag_name, tag_value, series_id, duration))
+ }
+
+ async fn load_from_storage(&mut self) -> Result<()> {
+ let mut result_stream = self
+ .storage
+ .scan(ScanRequest {
+ range: TimeRange::new(Timestamp(0), Timestamp::MAX),
+ predicate: vec![],
+ projections: None,
+ })
+ .await
+ .unwrap();
+ while let Some(item) = result_stream.next().await {
+ let batch = item.context("get next batch failed.")?;
+ for index in 0..batch.num_rows() {
+ let (series_id, tag_name, tag_value, metric_id, duration) =
+ TagIndexCache::parse_record_batch(&batch, index).await?;
+ self.update(
+ SegmentDuration::date(Duration::from_millis(duration)),
+ &SeriesId(series_id),
+ &vec![tag_name.to_vec()],
+ &vec![tag_value.to_vec()],
+ &MetricId(metric_id),
+ )
+ .await?;
+ }
+ }
+ Ok(())
+ }
+
+ fn schema() -> Arc<Schema> {
+ Arc::new(Schema::new(vec![
+ Field::new(COLUMN_METRIC_ID, DataType::UInt64, true),
+ Field::new(COLUMN_TAG_NAME, DataType::Binary, true),
+ Field::new(COLUMN_TAG_VALUE, DataType::Binary, true),
+ Field::new(COLUMN_SERIES_ID, DataType::UInt64, true),
+ Field::new(COLUMN_DURATION, DataType::UInt64, true),
+ ]))
+ }
+
+ async fn update(
+ &self,
+ date: SegmentDuration,
+ series_id: &SeriesId,
+ tag_names: &TagNames,
+ tag_values: &TagValues,
+ metric_id: &MetricId,
+ ) -> Result<bool> {
+ let segment_series = SegmentSeries {
+ segment: date,
+ series_id: *series_id,
+ };
+ if self.series_records.read().await.contains(&segment_series) {
+ Ok(false)
+ } else {
+ let mut series_records = self.series_records.write().await;
+ if series_records.contains(&segment_series) {
+ Ok(false)
+ } else {
+ series_records.insert(segment_series);
+ let cache_lock = self.cache.entry(date).or_default();
+ let mut cache_guard = cache_lock.write().await;
+
+ let mut tag_names = tag_names.clone();
+ let mut tag_values = tag_values.clone();
+ remove_default_tag(&mut tag_names, &mut tag_values);
+ tag_names
+ .into_iter()
+ .zip(tag_values.into_iter())
+ .for_each(|(name, value)| {
+ cache_guard
+ .entry(name)
+ .or_default()
+ .entry(value)
+ .or_default()
+ .entry(*metric_id)
+ .or_default()
+ .insert(*series_id);
+ });
+ Ok(true)
+ }
+ }
+ }
+
+ async fn notify_write(
+ &self,
+ current: Duration,
+ series_id: &SeriesId,
+ tag_names: &TagNames,
+ tag_values: &TagValues,
+ metric_id: &MetricId,
+ ) -> Result<()> {
+ self.sender
+ .send(Task::TagIndex(
+ current,
+ *series_id,
+ tag_names.clone(),
+ tag_values.clone(),
+ *metric_id,
+ ))
+ .await
+ .context("notify write failed.")?;
+ Ok(())
+ }
+}
+
+pub struct CacheManager {
Review Comment:
For index cache manager, it will only cache series, metrics will be managed
be metrics manager.
In this way, our code is more modular.
##########
src/metric_engine/src/index/cache.rs:
##########
@@ -0,0 +1,1293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ collections::{HashMap, HashSet},
+ path::Path,
+ sync::Arc,
+ time::{Duration, SystemTime},
+};
+
+use anyhow::Context;
+use arrow::{
+ array::{
+ Array, ArrayRef, BinaryArray, BinaryBuilder, ListArray, UInt64Array,
UInt64Builder,
+ UInt8Array, UInt8Builder,
+ },
+ buffer::OffsetBuffer,
+ datatypes::{DataType, Field, Schema, ToByteSlice},
+ record_batch::RecordBatch,
+};
+use dashmap::DashMap;
+use futures::StreamExt;
+use horaedb_storage::{
+ config::StorageConfig,
+ storage::{
+ CloudObjectStorage, ScanRequest, StorageRuntimes, TimeMergeStorageRef,
WriteRequest,
+ },
+ types::{ObjectStoreRef, TimeRange, Timestamp},
+};
+use tokio::{
+ sync::{
+ mpsc::{self, Receiver, Sender},
+ RwLock,
+ },
+ time::timeout,
+};
+use tracing::{error, warn};
+
+use crate::types::{
+ hash, FieldName, FieldType, Label, MetricId, MetricName, Result,
SegmentDuration, SeriesId,
+ SeriesKey, TagName, TagNames, TagValue, TagValues, DEFAULT_FIELD_NAME,
DEFAULT_FIELD_TYPE,
+};
+
+const COLUMN_DURATION: &str = "duration";
+const COLUMN_METRIC_NAME: &str = "metric_name";
+const COLUMN_METRIC_ID: &str = "metric_id";
+const COLUMN_SERIES_ID: &str = "series_id";
+const COLUMN_FIELD_ID: &str = "field_id";
+const COLUMN_FIELD_NAME: &str = "field_name";
+const COLUMN_FIELD_TYPE: &str = "field_type";
+const COLUMN_TAG_NAMES: &str = "tag_names";
+const COLUMN_TAG_VALUES: &str = "tag_values";
+const COLUMN_TAG_NAME: &str = "tag_name";
+const COLUMN_TAG_VALUE: &str = "tag_value";
+const COLUMN_TAG_ITEM: &str = "item";
+
+type ConcurrentMetricMap = RwLock<HashMap<MetricName, (FieldName, FieldType)>>;
+type ConcurrentSeriesMap = RwLock<HashMap<SeriesId, SeriesKey>>;
+type ConcurrentTagKVMap =
+ RwLock<HashMap<TagName, HashMap<TagValue, HashMap<MetricId,
HashSet<SeriesId>>>>>;
+
+struct MetricsCache {
+ cache: DashMap<SegmentDuration, ConcurrentMetricMap>,
+ pub storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+struct SeriesCache {
+ cache: DashMap<SegmentDuration, ConcurrentSeriesMap>,
+ pub storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+
+#[derive(PartialEq, Eq, Hash, Debug)]
+struct SegmentSeries {
+ segment: SegmentDuration,
+ series_id: SeriesId,
+}
+
+struct TagIndexCache {
+ cache: DashMap<SegmentDuration, ConcurrentTagKVMap>,
+ series_records: RwLock<HashSet<SegmentSeries>>,
+ storage: TimeMergeStorageRef,
+ sender: Sender<Task>,
+}
+
+impl MetricsCache {
+ fn new(storage: TimeMergeStorageRef, sender: Sender<Task>) -> Self {
+ Self {
+ cache: DashMap::new(),
+ storage,
+ sender,
+ }
+ }
+
+ #[allow(clippy::type_complexity)]
+ fn parse_record_batch(
+ batch: &RecordBatch,
+ index: usize,
+ ) -> Result<(&[u8], &[u8], u8, u64, u64, u64)> {
+ let metric_name = batch
+ .column_by_name(COLUMN_METRIC_NAME)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .context("parse column failed")?
+ .value(index);
+
+ let field_name = batch
+ .column_by_name(COLUMN_FIELD_NAME)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .context("parse column failed")?
+ .value(index);
+
+ let field_type = batch
+ .column_by_name(COLUMN_FIELD_TYPE)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt8Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let filed_id = batch
+ .column_by_name(COLUMN_FIELD_ID)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let metric_id = batch
+ .column_by_name(COLUMN_METRIC_ID)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ let duration = batch
+ .column_by_name(COLUMN_DURATION)
+ .context("get column failed")?
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .context("parse column failed")?
+ .value(index);
+
+ Ok((
+ metric_name,
+ field_name,
+ field_type,
+ filed_id,
+ metric_id,
+ duration,
+ ))
+ }
+
+ async fn load_from_storage(&mut self) -> Result<()> {
Review Comment:
The cache module shouldn't known how the persistence layer is implemented,
we can move this method to `IndexManager`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]