This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 21765246 feat: use thread pool in manifest merge (#1599)
21765246 is described below
commit 21765246a6160094830ef2a2ce85dc849288fcac
Author: 鲍金日 <[email protected]>
AuthorDate: Mon Nov 25 14:13:21 2024 +0800
feat: use thread pool in manifest merge (#1599)
## Rationale
## Detailed Changes
## Test Plan
CI
---
horaedb/Cargo.lock | 34 ++++-
horaedb/Cargo.toml | 1 +
horaedb/metric_engine/Cargo.toml | 1 +
horaedb/metric_engine/src/manifest.rs | 233 ++++++++++++++++++++++++----------
horaedb/metric_engine/src/sst.rs | 4 +-
horaedb/metric_engine/src/storage.rs | 50 ++++++--
horaedb/metric_engine/src/types.rs | 41 +++++-
7 files changed, 281 insertions(+), 83 deletions(-)
diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock
index 1ab7b2bc..1d68e849 100644
--- a/horaedb/Cargo.lock
+++ b/horaedb/Cargo.lock
@@ -329,6 +329,17 @@ dependencies = [
"zstd-safe",
]
+[[package]]
+name = "async-scoped"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4042078ea593edffc452eef14e99fdb2b120caa4ad9618bcdeabc4a023b98740"
+dependencies = [
+ "futures",
+ "pin-project",
+ "tokio",
+]
+
[[package]]
name = "async-trait"
version = "0.1.82"
@@ -1530,7 +1541,7 @@ dependencies = [
[[package]]
name = "macros"
-version = "2.1.0"
+version = "2.2.0-dev"
[[package]]
name = "md-5"
@@ -1555,6 +1566,7 @@ dependencies = [
"anyhow",
"arrow",
"arrow-schema",
+ "async-scoped",
"async-trait",
"bytes",
"datafusion",
@@ -1880,6 +1892,26 @@ dependencies = [
"siphasher",
]
+[[package]]
+name = "pin-project"
+version = "1.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "1.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "pin-project-lite"
version = "0.2.14"
diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml
index 6f56f090..d40bdafd 100644
--- a/horaedb/Cargo.toml
+++ b/horaedb/Cargo.toml
@@ -47,6 +47,7 @@ itertools = "0.3"
lazy_static = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
+async-scoped = { version = "0.9.0", features = ["use-tokio"] }
# This profile optimizes for good runtime performance.
[profile.release]
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index ed237fcb..7235fc68 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -34,6 +34,7 @@ workspace = true
anyhow = { workspace = true }
arrow = { workspace = true }
arrow-schema = { workspace = true }
+async-scoped = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
datafusion = { workspace = true }
diff --git a/horaedb/metric_engine/src/manifest.rs
b/horaedb/metric_engine/src/manifest.rs
index f9e6cef3..e435e699 100644
--- a/horaedb/metric_engine/src/manifest.rs
+++ b/horaedb/metric_engine/src/manifest.rs
@@ -25,19 +25,23 @@ use std::{
};
use anyhow::Context;
+use async_scoped::TokioScope;
use bytes::Bytes;
-use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
+use futures::{StreamExt, TryStreamExt};
use object_store::{path::Path, PutPayload};
use prost::Message;
-use tokio::sync::{
- mpsc::{self, Receiver, Sender},
- RwLock,
+use tokio::{
+ runtime::Runtime,
+ sync::{
+ mpsc::{self, Receiver, Sender},
+ RwLock,
+ },
};
use tracing::error;
use crate::{
sst::{FileId, FileMeta, SstFile},
- types::{ObjectStoreRef, TimeRange},
+ types::{ManifestMergeOptions, ObjectStoreRef, TimeRange},
AnyhowError, Error, Result,
};
@@ -96,6 +100,7 @@ impl Manifest {
pub async fn try_new(
root_dir: String,
store: ObjectStoreRef,
+ runtime: Arc<Runtime>,
merge_options: ManifestMergeOptions,
) -> Result<Self> {
let snapshot_path =
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
@@ -105,6 +110,7 @@ impl Manifest {
snapshot_path.clone(),
delta_dir.clone(),
store.clone(),
+ runtime.clone(),
merge_options,
)
.await?;
@@ -112,7 +118,7 @@ impl Manifest {
{
let merger = merger.clone();
// Start merger in background
- tokio::spawn(async move {
+ runtime.spawn(async move {
merger.run().await;
});
}
@@ -175,31 +181,11 @@ enum MergeType {
Soft,
}
-#[derive(Clone)]
-pub struct ManifestMergeOptions {
- channel_size: usize,
- merge_interval_seconds: usize,
- min_merge_threshold: usize,
- hard_merge_threshold: usize,
- soft_merge_threshold: usize,
-}
-
-impl Default for ManifestMergeOptions {
- fn default() -> Self {
- Self {
- channel_size: 10,
- merge_interval_seconds: 5,
- min_merge_threshold: 10,
- soft_merge_threshold: 50,
- hard_merge_threshold: 90,
- }
- }
-}
-
struct ManifestMerger {
snapshot_path: Path,
delta_dir: Path,
store: ObjectStoreRef,
+ runtime: Arc<Runtime>,
sender: Sender<MergeType>,
receiver: RwLock<Receiver<MergeType>>,
deltas_num: AtomicUsize,
@@ -211,6 +197,7 @@ impl ManifestMerger {
snapshot_path: Path,
delta_dir: Path,
store: ObjectStoreRef,
+ runtime: Arc<Runtime>,
merge_options: ManifestMergeOptions,
) -> Result<Arc<Self>> {
let (tx, rx) = mpsc::channel(merge_options.channel_size);
@@ -218,6 +205,7 @@ impl ManifestMerger {
snapshot_path,
delta_dir,
store,
+ runtime,
sender: tx,
receiver: RwLock::new(rx),
deltas_num: AtomicUsize::new(0),
@@ -279,31 +267,21 @@ impl ManifestMerger {
}
async fn do_merge(&self) -> Result<()> {
- let paths = self
- .store
- .list(Some(&self.delta_dir))
- .map(|value| {
- value
- .map(|v| v.location)
- .context("failed to get delta file path")
- })
- .try_collect::<Vec<_>>()
- .await?;
+ let paths = list_delta_paths(&self.store, &self.delta_dir).await?;
if paths.is_empty() {
return Ok(());
}
- let mut stream_read = FuturesUnordered::new();
- for path in paths.clone() {
- let store = self.store.clone();
- // TODO: use thread pool to read manifest files
- let handle = tokio::spawn(async move { read_delta_file(store,
path).await });
- stream_read.push(handle);
- }
- let mut delta_files = Vec::with_capacity(stream_read.len());
- while let Some(res) = stream_read.next().await {
- let res = res.context("Failed to join read delta task")??;
- delta_files.push(res);
+ let (_, results) = TokioScope::scope_and_block(|scope| {
+ for path in &paths {
+ scope.spawn(async { read_delta_file(&self.store, path).await
});
+ }
+ });
+
+ let mut delta_files = Vec::with_capacity(results.len());
+ for res in results {
+ let sst_file = res.context("Failed to join read delta files
task")??;
+ delta_files.push(sst_file);
}
let mut payload = read_snapshot(&self.store,
&self.snapshot_path).await?;
@@ -327,21 +305,19 @@ impl ManifestMerger {
self.store
.put(&self.snapshot_path, put_payload)
.await
- .context("Failed to update manifest")?;
+ .with_context(|| format!("Failed to update manifest, path:{}",
self.snapshot_path))?;
// 2. Delete the merged manifest files
- let mut stream_delete = FuturesUnordered::new();
- for path in paths {
- let store = self.store.clone();
- // TODO: use thread pool to delete sst files
- let handle = tokio::spawn(async move { delete_delta_file(store,
path).await });
- stream_delete.push(handle);
- }
+ let (_, results) = TokioScope::scope_and_block(|scope| {
+ for path in &paths {
+ scope.spawn(async { delete_delta_file(&self.store, path).await
});
+ }
+ });
- while let Some(res) = stream_delete.next().await {
+ for res in results {
match res {
Err(e) => {
- error!("Failed to join delete delta task, err:{e}")
+ error!("Failed to join delete delta files task, err:{e}")
}
Ok(v) => {
if let Err(e) = v {
@@ -363,25 +339,25 @@ async fn read_snapshot(store: &ObjectStoreRef, path:
&Path) -> Result<Payload> {
let bytes = v
.bytes()
.await
- .context("failed to read manifest snapshot")?;
- let pb_payload =
- pb_types::Manifest::decode(bytes).context("failed to decode
manifest snapshot")?;
+ .with_context(|| format!("Failed to read manifest snapshot,
path:{path}"))?;
+ let pb_payload = pb_types::Manifest::decode(bytes)
+ .with_context(|| format!("Failed to decode manifest snapshot,
path:{path}"))?;
Payload::try_from(pb_payload)
}
Err(err) => {
if err.to_string().contains("not found") {
Ok(Payload { files: vec![] })
} else {
- let context = format!("Failed to get manifest snapshot,
path:{path}");
+ let context = format!("Failed to read manifest snapshot,
path:{path}");
Err(AnyhowError::new(err).context(context).into())
}
}
}
}
-async fn read_delta_file(store: ObjectStoreRef, sst_path: Path) ->
Result<SstFile> {
+async fn read_delta_file(store: &ObjectStoreRef, sst_path: &Path) ->
Result<SstFile> {
let bytes = store
- .get(&sst_path)
+ .get(sst_path)
.await
.with_context(|| format!("failed to get delta file, path:{sst_path}"))?
.bytes()
@@ -396,11 +372,136 @@ async fn read_delta_file(store: ObjectStoreRef,
sst_path: Path) -> Result<SstFil
Ok(sst)
}
-async fn delete_delta_file(store: ObjectStoreRef, path: Path) -> Result<()> {
+async fn delete_delta_file(store: &ObjectStoreRef, path: &Path) -> Result<()> {
store
- .delete(&path)
+ .delete(path)
.await
.with_context(|| format!("Failed to delete delta files,
path:{path}"))?;
Ok(())
}
+
+async fn list_delta_paths(store: &ObjectStoreRef, delta_dir: &Path) ->
Result<Vec<Path>> {
+ let paths = store
+ .list(Some(delta_dir))
+ .map(|value| {
+ value
+ .map(|v| v.location)
+ .with_context(|| format!("Failed to list delta paths, delta
dir:{}", delta_dir))
+ })
+ .try_collect::<Vec<_>>()
+ .await?;
+
+ Ok(paths)
+}
+
+#[cfg(test)]
+mod tests {
+ use std::{sync::Arc, thread::sleep};
+
+ use object_store::local::LocalFileSystem;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_find_manifest() {
+ let root_dir = temp_dir::TempDir::new().unwrap();
+ let runtime = tokio::runtime::Runtime::new().unwrap();
+ let store = Arc::new(LocalFileSystem::new());
+
+ let manifest = Manifest::try_new(
+ root_dir.path().to_string_lossy().to_string(),
+ store,
+ Arc::new(runtime),
+ ManifestMergeOptions::default(),
+ )
+ .await
+ .unwrap();
+
+ for i in 0..20 {
+ let time_range = (i..i + 1).into();
+ let meta = FileMeta {
+ max_sequence: i as u64,
+ num_rows: i as u32,
+ size: i as u32,
+ time_range,
+ };
+ manifest.add_file(i as u64, meta).await.unwrap();
+ }
+
+ let find_range = (10..15).into();
+ let mut ssts = manifest.find_ssts(&find_range).await;
+
+ let mut expected_ssts = (10..15)
+ .map(|i| {
+ let id = i as u64;
+ let time_range = (i..i + 1).into();
+ let meta = FileMeta {
+ max_sequence: i as u64,
+ num_rows: i as u32,
+ size: i as u32,
+ time_range,
+ };
+ SstFile { id, meta }
+ })
+ .collect::<Vec<_>>();
+
+ expected_ssts.sort_by(|a, b| a.id.cmp(&b.id));
+ ssts.sort_by(|a, b| a.id.cmp(&b.id));
+ assert_eq!(expected_ssts, ssts);
+ }
+
+ #[tokio::test]
+ async fn test_merge_manifest() {
+ let root_dir = temp_dir::TempDir::new()
+ .unwrap()
+ .path()
+ .to_string_lossy()
+ .to_string();
+ let snapshot_path =
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
+ let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}"));
+ let runtime = tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(4)
+ .enable_all()
+ .build()
+ .unwrap();
+ let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());
+
+ let manifest = Manifest::try_new(
+ root_dir,
+ store.clone(),
+ Arc::new(runtime),
+ ManifestMergeOptions {
+ merge_interval_seconds: 1,
+ ..Default::default()
+ },
+ )
+ .await
+ .unwrap();
+
+ // Add manifest files
+ for i in 0..20 {
+ let time_range = (i..i + 1).into();
+ let meta = FileMeta {
+ max_sequence: i as u64,
+ num_rows: i as u32,
+ size: i as u32,
+ time_range,
+ };
+ manifest.add_file(i as u64, meta).await.unwrap();
+ }
+
+ // Wait for merge manifest to finish
+ sleep(Duration::from_secs(2));
+
+ let mut mem_ssts = manifest.payload.read().await.files.clone();
+ let mut ssts = read_snapshot(&store,
&snapshot_path).await.unwrap().files;
+
+ mem_ssts.sort_by(|a, b| a.id.cmp(&b.id));
+ ssts.sort_by(|a, b| a.id.cmp(&b.id));
+ assert_eq!(mem_ssts, ssts);
+
+ let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
+ assert!(delta_paths.is_empty());
+ }
+}
diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs
index 703b4bc4..644988f8 100644
--- a/horaedb/metric_engine/src/sst.rs
+++ b/horaedb/metric_engine/src/sst.rs
@@ -31,7 +31,7 @@ pub const PREFIX_PATH: &str = "data";
pub type FileId = u64;
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SstFile {
pub id: FileId,
pub meta: FileMeta,
@@ -58,7 +58,7 @@ impl From<SstFile> for pb_types::SstFile {
}
}
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FileMeta {
pub max_sequence: u64,
pub num_rows: u32,
diff --git a/horaedb/metric_engine/src/storage.rs
b/horaedb/metric_engine/src/storage.rs
index c8c832d2..ac37e9c8 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -54,12 +54,16 @@ use parquet::{
format::SortingColumn,
schema::types::ColumnPath,
};
+use tokio::runtime::Runtime;
use crate::{
- manifest::{Manifest, ManifestMergeOptions},
+ manifest::Manifest,
read::{DefaultParquetFileReaderFactory, MergeExec},
sst::{allocate_id, FileId, FileMeta, SstFile},
- types::{ObjectStoreRef, TimeRange, WriteOptions, WriteResult,
SEQ_COLUMN_NAME},
+ types::{
+ ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange,
WriteOptions, WriteResult,
+ SEQ_COLUMN_NAME,
+ },
Result,
};
@@ -93,6 +97,25 @@ pub trait TimeMergeStorage {
async fn compact(&self, req: CompactRequest) -> Result<()>;
}
+struct StorageRuntimes {
+ compact_runtime: Arc<Runtime>,
+}
+
+impl StorageRuntimes {
+ pub fn new(runtime_opts: RuntimeOptions) -> Result<Self> {
+ let compact_runtime = tokio::runtime::Builder::new_multi_thread()
+ .thread_name("storage-compact")
+ .worker_threads(runtime_opts.compact_thread_num)
+ .enable_all()
+ .build()
+ .context("build storgae compact runtime")?;
+
+ Ok(Self {
+ compact_runtime: Arc::new(compact_runtime),
+ })
+ }
+}
+
/// `TimeMergeStorage` implementation using cloud object storage, it will split
/// data into different segments(aka `segment_duration`) based time range.
///
@@ -105,6 +128,7 @@ pub struct CloudObjectStorage {
arrow_schema: SchemaRef,
num_primary_keys: usize,
manifest: Manifest,
+ runtimes: StorageRuntimes,
df_schema: DFSchema,
write_props: WriterProperties,
@@ -128,14 +152,16 @@ impl CloudObjectStorage {
store: ObjectStoreRef,
arrow_schema: SchemaRef,
num_primary_keys: usize,
- write_options: WriteOptions,
- merge_options: ManifestMergeOptions,
+ storage_opts: StorageOptions,
) -> Result<Self> {
let manifest_prefix = crate::manifest::PREFIX_PATH;
+ let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
+
let manifest = Manifest::try_new(
format!("{path}/{manifest_prefix}"),
store.clone(),
- merge_options,
+ runtimes.compact_runtime.clone(),
+ storage_opts.manifest_merge_opts,
)
.await?;
let mut new_fields = arrow_schema.fields.clone().to_vec();
@@ -149,7 +175,7 @@ impl CloudObjectStorage {
arrow_schema.metadata.clone(),
));
let df_schema =
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
- let write_props = Self::build_write_props(write_options,
num_primary_keys);
+ let write_props = Self::build_write_props(storage_opts.write_opts,
num_primary_keys);
Ok(Self {
path,
num_primary_keys,
@@ -157,6 +183,7 @@ impl CloudObjectStorage {
store,
arrow_schema,
manifest,
+ runtimes,
df_schema,
write_props,
})
@@ -414,7 +441,7 @@ mod tests {
use object_store::local::LocalFileSystem;
use super::*;
- use crate::{arrow_schema, manifest::ManifestMergeOptions,
types::Timestamp};
+ use crate::{arrow_schema, types::Timestamp};
#[tokio::test]
async fn test_build_scan_plan() {
@@ -426,8 +453,7 @@ mod tests {
store,
schema.clone(),
1, // num_primary_keys
- WriteOptions::default(),
- ManifestMergeOptions::default(),
+ StorageOptions::default(),
)
.await
.unwrap();
@@ -472,8 +498,7 @@ mod tests {
store,
schema.clone(),
2, // num_primary_keys
- WriteOptions::default(),
- ManifestMergeOptions::default(),
+ StorageOptions::default(),
)
.await
.unwrap();
@@ -549,8 +574,7 @@ mod tests {
store,
schema.clone(),
1,
- WriteOptions::default(),
- ManifestMergeOptions::default(),
+ StorageOptions::default(),
)
.await
.unwrap();
diff --git a/horaedb/metric_engine/src/types.rs
b/horaedb/metric_engine/src/types.rs
index 624b8a2e..98ba9764 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -68,7 +68,7 @@ impl Timestamp {
pub const MIN: Timestamp = Timestamp(i64::MIN);
}
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TimeRange(Range<Timestamp>);
impl From<Range<Timestamp>> for TimeRange {
@@ -146,3 +146,42 @@ impl Default for WriteOptions {
}
}
}
+
+pub struct RuntimeOptions {
+ pub compact_thread_num: usize,
+}
+
+impl Default for RuntimeOptions {
+ fn default() -> Self {
+ Self {
+ compact_thread_num: 4,
+ }
+ }
+}
+
+pub struct ManifestMergeOptions {
+ pub channel_size: usize,
+ pub merge_interval_seconds: usize,
+ pub min_merge_threshold: usize,
+ pub hard_merge_threshold: usize,
+ pub soft_merge_threshold: usize,
+}
+
+impl Default for ManifestMergeOptions {
+ fn default() -> Self {
+ Self {
+ channel_size: 10,
+ merge_interval_seconds: 5,
+ min_merge_threshold: 10,
+ soft_merge_threshold: 50,
+ hard_merge_threshold: 90,
+ }
+ }
+}
+
+#[derive(Default)]
+pub struct StorageOptions {
+ pub write_opts: WriteOptions,
+ pub manifest_merge_opts: ManifestMergeOptions,
+ pub runtime_opts: RuntimeOptions,
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]