This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 72f63f21 feat: support persist manifest efficientlly (#1596)
72f63f21 is described below
commit 72f63f2173f9da9c25ff241ce7fd497b16f634c1
Author: 鲍金日 <[email protected]>
AuthorDate: Fri Nov 22 11:12:50 2024 +0800
feat: support persist manifest efficientlly (#1596)
## Rationale
close #1592
## Detailed Changes
- support merge manifest ssts background
## Test Plan
CI
---------
Co-authored-by: jiacai2050 <[email protected]>
---
horaedb/Cargo.lock | 1 +
horaedb/metric_engine/Cargo.toml | 1 +
horaedb/metric_engine/src/manifest.rs | 349 +++++++++++++++++++++++++++++-----
horaedb/metric_engine/src/storage.rs | 16 +-
4 files changed, 320 insertions(+), 47 deletions(-)
diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock
index 4c8aac43..1ab7b2bc 100644
--- a/horaedb/Cargo.lock
+++ b/horaedb/Cargo.lock
@@ -1569,6 +1569,7 @@ dependencies = [
"temp-dir",
"thiserror",
"tokio",
+ "tracing",
]
[[package]]
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index 095532bd..ed237fcb 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -47,6 +47,7 @@ pb_types = { workspace = true }
prost = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
+tracing = { workspace = true }
[dev-dependencies]
temp-dir = { workspace = true }
diff --git a/horaedb/metric_engine/src/manifest.rs
b/horaedb/metric_engine/src/manifest.rs
index 0d865a04..f9e6cef3 100644
--- a/horaedb/metric_engine/src/manifest.rs
+++ b/horaedb/metric_engine/src/manifest.rs
@@ -15,11 +15,25 @@
// specific language governing permissions and limitations
// under the License.
+use std::{
+ collections::HashSet,
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+ },
+ time::Duration,
+};
+
use anyhow::Context;
use bytes::Bytes;
+use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use object_store::{path::Path, PutPayload};
use prost::Message;
-use tokio::sync::RwLock;
+use tokio::sync::{
+ mpsc::{self, Receiver, Sender},
+ RwLock,
+};
+use tracing::error;
use crate::{
sst::{FileId, FileMeta, SstFile},
@@ -29,11 +43,12 @@ use crate::{
pub const PREFIX_PATH: &str = "manifest";
pub const SNAPSHOT_FILENAME: &str = "snapshot";
+pub const DELTA_PREFIX: &str = "delta";
pub struct Manifest {
- path: String,
- snapshot_path: Path,
+ delta_dir: Path,
store: ObjectStoreRef,
+ merger: Arc<ManifestMerger>,
payload: RwLock<Payload>,
}
@@ -42,6 +57,15 @@ pub struct Payload {
files: Vec<SstFile>,
}
+impl Payload {
+ // TODO: we could sort sst files by name asc, then the dedup will be more
+ // efficient
+ pub fn dedup_files(&mut self) {
+ let mut seen = HashSet::with_capacity(self.files.len());
+ self.files.retain(|file| seen.insert(file.id));
+ }
+}
+
impl TryFrom<pb_types::Manifest> for Payload {
type Error = Error;
@@ -69,63 +93,67 @@ impl From<Payload> for pb_types::Manifest {
}
impl Manifest {
- pub async fn try_new(path: String, store: ObjectStoreRef) -> Result<Self> {
- let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}"));
- let payload = match store.get(&snapshot_path).await {
- Ok(v) => {
- let bytes = v
- .bytes()
- .await
- .context("failed to read manifest snapshot")?;
- let pb_payload = pb_types::Manifest::decode(bytes)
- .context("failed to decode manifest snapshot")?;
- Payload::try_from(pb_payload)?
- }
- Err(err) => {
- if err.to_string().contains("not found") {
- Payload { files: vec![] }
- } else {
- let context = format!("Failed to get manifest snapshot,
path:{snapshot_path}");
- return Err(AnyhowError::new(err).context(context).into());
- }
- }
- };
+ pub async fn try_new(
+ root_dir: String,
+ store: ObjectStoreRef,
+ merge_options: ManifestMergeOptions,
+ ) -> Result<Self> {
+ let snapshot_path =
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
+ let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}"));
+
+ let merger = ManifestMerger::try_new(
+ snapshot_path.clone(),
+ delta_dir.clone(),
+ store.clone(),
+ merge_options,
+ )
+ .await?;
+
+ {
+ let merger = merger.clone();
+ // Start merger in background
+ tokio::spawn(async move {
+ merger.run().await;
+ });
+ }
+
+ let payload = read_snapshot(&store, &snapshot_path).await?;
Ok(Self {
- path,
- snapshot_path,
+ delta_dir,
store,
+ merger,
payload: RwLock::new(payload),
})
}
- // TODO: Now this functions is poorly implemented, we concat new_sst to
- // snapshot, and upload it back in a whole.
- // In more efficient way, we can create a new diff file, and do compaction
in
- // background to merge them to `snapshot`.
pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
- let mut payload = self.payload.write().await;
- let mut tmp_ssts = payload.files.clone();
+ self.merger.maybe_schedule_merge().await?;
+
+ let new_sst_path = Path::from(format!("{}/{id}", self.delta_dir));
let new_sst = SstFile { id, meta };
- tmp_ssts.push(new_sst.clone());
- let pb_manifest = pb_types::Manifest {
- files: tmp_ssts.into_iter().map(|f| f.into()).collect::<Vec<_>>(),
- };
- let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
- pb_manifest
+ let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
+ let mut buf: Vec<u8> =
Vec::with_capacity(new_sst_payload.encoded_len());
+ new_sst_payload
.encode(&mut buf)
- .context("failed to encode manifest")?;
+ .context("failed to encode manifest file")?;
let put_payload = PutPayload::from_bytes(Bytes::from(buf));
- // 1. Persist the snapshot
+ // 1. Persist the delta manifest
self.store
- .put(&self.snapshot_path, put_payload)
+ .put(&new_sst_path, put_payload)
.await
- .context("Failed to update manifest")?;
+ .with_context(|| format!("Failed to write delta manifest,
path:{}", new_sst_path))?;
// 2. Update cached payload
- payload.files.push(new_sst);
+ {
+ let mut payload = self.payload.write().await;
+ payload.files.push(new_sst);
+ }
+
+ // 3. Update delta files num
+ self.merger.add_delta_num();
Ok(())
}
@@ -141,3 +169,238 @@ impl Manifest {
.collect()
}
}
+
+enum MergeType {
+ Hard,
+ Soft,
+}
+
+#[derive(Clone)]
+pub struct ManifestMergeOptions {
+ channel_size: usize,
+ merge_interval_seconds: usize,
+ min_merge_threshold: usize,
+ hard_merge_threshold: usize,
+ soft_merge_threshold: usize,
+}
+
+impl Default for ManifestMergeOptions {
+ fn default() -> Self {
+ Self {
+ channel_size: 10,
+ merge_interval_seconds: 5,
+ min_merge_threshold: 10,
+ soft_merge_threshold: 50,
+ hard_merge_threshold: 90,
+ }
+ }
+}
+
+struct ManifestMerger {
+ snapshot_path: Path,
+ delta_dir: Path,
+ store: ObjectStoreRef,
+ sender: Sender<MergeType>,
+ receiver: RwLock<Receiver<MergeType>>,
+ deltas_num: AtomicUsize,
+ merge_options: ManifestMergeOptions,
+}
+
+impl ManifestMerger {
+ async fn try_new(
+ snapshot_path: Path,
+ delta_dir: Path,
+ store: ObjectStoreRef,
+ merge_options: ManifestMergeOptions,
+ ) -> Result<Arc<Self>> {
+ let (tx, rx) = mpsc::channel(merge_options.channel_size);
+ let merger = Self {
+ snapshot_path,
+ delta_dir,
+ store,
+ sender: tx,
+ receiver: RwLock::new(rx),
+ deltas_num: AtomicUsize::new(0),
+ merge_options,
+ };
+ // Merge all delta files when startup
+ merger.do_merge().await?;
+
+ Ok(Arc::new(merger))
+ }
+
+ async fn run(&self) {
+ let merge_interval =
Duration::from_secs(self.merge_options.merge_interval_seconds as u64);
+ let mut receiver = self.receiver.write().await;
+ loop {
+ tokio::select! {
+ _ = tokio::time::sleep(merge_interval) => {
+ if self.deltas_num.load(Ordering::Relaxed) >
self.merge_options.min_merge_threshold {
+ if let Err(err) = self.do_merge().await {
+ error!("Failed to merge delta, err:{err}");
+ }
+ }
+ }
+ _merge_type = receiver.recv() => {
+ if self.deltas_num.load(Ordering::Relaxed) >
self.merge_options.min_merge_threshold {
+ if let Err(err) = self.do_merge().await {
+ error!("Failed to merge delta, err:{err}");
+ }
+ }
+ }
+ }
+ }
+ }
+
+ fn schedule_merge(&self, task: MergeType) {
+ if let Err(err) = self.sender.try_send(task) {
+ error!("Failed to send merge task, err:{err}");
+ }
+ }
+
+ async fn maybe_schedule_merge(&self) -> Result<()> {
+ let current_num = self.deltas_num.load(Ordering::Relaxed);
+
+ if current_num > self.merge_options.hard_merge_threshold {
+ self.schedule_merge(MergeType::Hard);
+ return Err(AnyhowError::msg(format!(
+ "Manifest has too many delta files, value:{current_num}"
+ ))
+ .into());
+ } else if current_num > self.merge_options.soft_merge_threshold {
+ self.schedule_merge(MergeType::Soft);
+ }
+
+ Ok(())
+ }
+
+ fn add_delta_num(&self) {
+ self.deltas_num.fetch_add(1, Ordering::Relaxed);
+ }
+
+ async fn do_merge(&self) -> Result<()> {
+ let paths = self
+ .store
+ .list(Some(&self.delta_dir))
+ .map(|value| {
+ value
+ .map(|v| v.location)
+ .context("failed to get delta file path")
+ })
+ .try_collect::<Vec<_>>()
+ .await?;
+ if paths.is_empty() {
+ return Ok(());
+ }
+
+ let mut stream_read = FuturesUnordered::new();
+ for path in paths.clone() {
+ let store = self.store.clone();
+ // TODO: use thread pool to read manifest files
+ let handle = tokio::spawn(async move { read_delta_file(store,
path).await });
+ stream_read.push(handle);
+ }
+ let mut delta_files = Vec::with_capacity(stream_read.len());
+ while let Some(res) = stream_read.next().await {
+ let res = res.context("Failed to join read delta task")??;
+ delta_files.push(res);
+ }
+
+ let mut payload = read_snapshot(&self.store,
&self.snapshot_path).await?;
+ payload.files.extend(delta_files);
+ payload.dedup_files();
+
+ let pb_manifest = pb_types::Manifest {
+ files: payload
+ .files
+ .into_iter()
+ .map(|f| f.into())
+ .collect::<Vec<_>>(),
+ };
+ let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
+ pb_manifest
+ .encode(&mut buf)
+ .context("failed to encode manifest")?;
+ let put_payload = PutPayload::from_bytes(Bytes::from(buf));
+
+ // 1. Persist the snapshot
+ self.store
+ .put(&self.snapshot_path, put_payload)
+ .await
+ .context("Failed to update manifest")?;
+
+ // 2. Delete the merged manifest files
+ let mut stream_delete = FuturesUnordered::new();
+ for path in paths {
+ let store = self.store.clone();
+ // TODO: use thread pool to delete sst files
+ let handle = tokio::spawn(async move { delete_delta_file(store,
path).await });
+ stream_delete.push(handle);
+ }
+
+ while let Some(res) = stream_delete.next().await {
+ match res {
+ Err(e) => {
+ error!("Failed to join delete delta task, err:{e}")
+ }
+ Ok(v) => {
+ if let Err(e) = v {
+ error!("Failed to delete delta, err:{e}")
+ } else {
+ self.deltas_num.fetch_sub(1, Ordering::Relaxed);
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+}
+
+async fn read_snapshot(store: &ObjectStoreRef, path: &Path) -> Result<Payload>
{
+ match store.get(path).await {
+ Ok(v) => {
+ let bytes = v
+ .bytes()
+ .await
+ .context("failed to read manifest snapshot")?;
+ let pb_payload =
+ pb_types::Manifest::decode(bytes).context("failed to decode
manifest snapshot")?;
+ Payload::try_from(pb_payload)
+ }
+ Err(err) => {
+ if err.to_string().contains("not found") {
+ Ok(Payload { files: vec![] })
+ } else {
+ let context = format!("Failed to get manifest snapshot,
path:{path}");
+ Err(AnyhowError::new(err).context(context).into())
+ }
+ }
+ }
+}
+
+async fn read_delta_file(store: ObjectStoreRef, sst_path: Path) ->
Result<SstFile> {
+ let bytes = store
+ .get(&sst_path)
+ .await
+ .with_context(|| format!("failed to get delta file, path:{sst_path}"))?
+ .bytes()
+ .await
+ .with_context(|| format!("failed to read delta file,
path:{sst_path}"))?;
+
+ let pb_sst = pb_types::SstFile::decode(bytes)
+ .with_context(|| format!("failed to decode delta file,
path:{sst_path}"))?;
+
+ let sst = SstFile::try_from(pb_sst)
+ .with_context(|| format!("failed to convert delta file,
path:{sst_path}"))?;
+ Ok(sst)
+}
+
+async fn delete_delta_file(store: ObjectStoreRef, path: Path) -> Result<()> {
+ store
+ .delete(&path)
+ .await
+ .with_context(|| format!("Failed to delete delta files,
path:{path}"))?;
+
+ Ok(())
+}
diff --git a/horaedb/metric_engine/src/storage.rs
b/horaedb/metric_engine/src/storage.rs
index 02816088..c8c832d2 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -56,7 +56,7 @@ use parquet::{
};
use crate::{
- manifest::Manifest,
+ manifest::{Manifest, ManifestMergeOptions},
read::{DefaultParquetFileReaderFactory, MergeExec},
sst::{allocate_id, FileId, FileMeta, SstFile},
types::{ObjectStoreRef, TimeRange, WriteOptions, WriteResult,
SEQ_COLUMN_NAME},
@@ -129,10 +129,15 @@ impl CloudObjectStorage {
arrow_schema: SchemaRef,
num_primary_keys: usize,
write_options: WriteOptions,
+ merge_options: ManifestMergeOptions,
) -> Result<Self> {
let manifest_prefix = crate::manifest::PREFIX_PATH;
- let manifest =
- Manifest::try_new(format!("{path}/{manifest_prefix}"),
store.clone()).await?;
+ let manifest = Manifest::try_new(
+ format!("{path}/{manifest_prefix}"),
+ store.clone(),
+ merge_options,
+ )
+ .await?;
let mut new_fields = arrow_schema.fields.clone().to_vec();
new_fields.push(Arc::new(Field::new(
SEQ_COLUMN_NAME,
@@ -409,7 +414,7 @@ mod tests {
use object_store::local::LocalFileSystem;
use super::*;
- use crate::{arrow_schema, types::Timestamp};
+ use crate::{arrow_schema, manifest::ManifestMergeOptions,
types::Timestamp};
#[tokio::test]
async fn test_build_scan_plan() {
@@ -422,6 +427,7 @@ mod tests {
schema.clone(),
1, // num_primary_keys
WriteOptions::default(),
+ ManifestMergeOptions::default(),
)
.await
.unwrap();
@@ -467,6 +473,7 @@ mod tests {
schema.clone(),
2, // num_primary_keys
WriteOptions::default(),
+ ManifestMergeOptions::default(),
)
.await
.unwrap();
@@ -543,6 +550,7 @@ mod tests {
schema.clone(),
1,
WriteOptions::default(),
+ ManifestMergeOptions::default(),
)
.await
.unwrap();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]