This is an automated email from the ASF dual-hosted git repository.
baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new d2f878f1 feat: add server config (#1615)
d2f878f1 is described below
commit d2f878f1b718e0b16fe70c8983a3d919c1d8d5a1
Author: Jiacai Liu <[email protected]>
AuthorDate: Fri Dec 20 10:17:18 2024 +0800
feat: add server config (#1615)
## Rationale
## Detailed Changes
- Server can read config from cli args
- Start 4 write worker to bench write.
## Test Plan
CI
---
Cargo.lock | 36 +++
Cargo.toml | 10 +-
src/server/Cargo.toml => docs/example.toml | 27 +-
licenserc.toml | 26 +-
src/benchmarks/Cargo.toml | 1 +
src/benchmarks/src/config.rs | 3 +-
src/{server => common}/Cargo.toml | 12 +-
.../src/util.rs => common/src/lib.rs} | 11 +-
src/common/src/size_ext.rs | 295 +++++++++++++++++++++
src/common/src/time_ext.rs | 288 ++++++++++++++++++++
src/metric_engine/Cargo.toml | 1 +
src/metric_engine/src/compaction/picker.rs | 3 +-
src/metric_engine/src/lib.rs | 1 -
src/metric_engine/src/storage.rs | 103 ++++---
src/metric_engine/src/types.rs | 16 --
src/server/Cargo.toml | 5 +
src/server/src/config.rs | 159 +++++++++++
src/server/src/main.rs | 159 ++++++++---
18 files changed, 988 insertions(+), 168 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index ac89a9fb..b5c4a8d2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -630,6 +630,7 @@ name = "benchmarks"
version = "2.2.0-alpha"
dependencies = [
"bytes",
+ "common",
"criterion",
"metric_engine",
"pb_types",
@@ -859,6 +860,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84"
dependencies = [
"clap_builder",
+ "clap_derive",
]
[[package]]
@@ -867,8 +869,22 @@ version = "4.5.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838"
dependencies = [
+ "anstream",
"anstyle",
"clap_lex",
+ "strsim",
+]
+
+[[package]]
+name = "clap_derive"
+version = "4.5.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab"
+dependencies = [
+ "heck",
+ "proc-macro2",
+ "quote",
+ "syn",
]
[[package]]
@@ -894,6 +910,14 @@ dependencies = [
"unicode-width",
]
+[[package]]
+name = "common"
+version = "2.2.0-alpha"
+dependencies = [
+ "serde",
+ "toml",
+]
+
[[package]]
name = "const-random"
version = "0.1.18"
@@ -2141,6 +2165,7 @@ dependencies = [
"byteorder",
"bytes",
"bytesize",
+ "common",
"datafusion",
"futures",
"itertools 0.3.25",
@@ -2881,10 +2906,15 @@ version = "2.2.0-alpha"
dependencies = [
"actix-web",
"arrow",
+ "clap",
+ "common",
"futures",
"metric_engine",
"object_store",
+ "rand",
+ "serde",
"tokio",
+ "toml",
"tracing",
"tracing-subscriber",
]
@@ -3020,6 +3050,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
+[[package]]
+name = "strsim"
+version = "0.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
+
[[package]]
name = "strum"
version = "0.26.3"
diff --git a/Cargo.toml b/Cargo.toml
index d81bca03..9f5df523 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,11 +23,18 @@ license = "Apache-2.0"
[workspace]
resolver = "2"
-members = ["src/benchmarks", "src/metric_engine", "src/pb_types", "src/server"]
+members = [
+ "src/benchmarks",
+ "src/common",
+ "src/metric_engine",
+ "src/pb_types",
+ "src/server"
+]
[workspace.dependencies]
anyhow = { version = "1.0" }
metric_engine = { path = "src/metric_engine" }
+common = { path = "src/common" }
thiserror = "1"
bytes = "1"
byteorder = "1"
@@ -38,6 +45,7 @@ pb_types = { path = "src/pb_types" }
prost = { version = "0.13" }
arrow = { version = "53", features = ["prettyprint"] }
bytesize = "1"
+clap = "4"
arrow-schema = "53"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
diff --git a/src/server/Cargo.toml b/docs/example.toml
similarity index 64%
copy from src/server/Cargo.toml
copy to docs/example.toml
index 03ad7929..0fbb0220 100644
--- a/src/server/Cargo.toml
+++ b/docs/example.toml
@@ -15,27 +15,10 @@
# specific language governing permissions and limitations
# under the License.
-[package]
-name = "server"
+port = 5000
-[package.license]
-workspace = true
+[metric_engine]
-[package.version]
-workspace = true
-
-[package.authors]
-workspace = true
-
-[package.edition]
-workspace = true
-
-[dependencies]
-actix-web = "4"
-arrow = { workspace = true }
-futures = { workspace = true }
-metric_engine = { workspace = true }
-object_store = { workspace = true }
-tokio = { workspace = true }
-tracing = { workspace = true }
-tracing-subscriber = { workspace = true }
+[metric_engine.storage]
+type = "Local"
+data_dir = "/tmp/horaedb-storage"
\ No newline at end of file
diff --git a/licenserc.toml b/licenserc.toml
index 9bace81e..9bc60b63 100644
--- a/licenserc.toml
+++ b/licenserc.toml
@@ -18,28 +18,6 @@
headerPath = "Apache-2.0-ASF.txt"
excludes = [
- # Derived
-
"horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go",
-
"horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go",
- "horaemeta/server/service/http/route.go",
- "src/server/src/federated.rs",
- "src/server/src/session.rs",
- "src/components/skiplist/benches/bench.rs",
- "src/components/skiplist/src/lib.rs",
- "src/components/skiplist/src/key.rs",
- "src/components/skiplist/src/list.rs",
- "src/components/skiplist/tests/tests.rs",
- "src/components/skiplist/Cargo.toml",
- "src/components/size_ext/src/lib.rs",
- "src/components/future_ext/src/cancel.rs",
- "src/components/tracing_util/src/lib.rs",
- "src/components/tracing_util/src/logging.rs",
- "src/components/tracing_util/Cargo.toml",
- "DISCLAIMER",
- "NOTICE",
- "horaemeta/DEPENDENCIES.csv",
- "DEPENDENCIES.tsv",
- # Test files
- "*snap",
- "*result"
+ # Forked
+ "src/common/src/size_ext.rs",
]
diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml
index 788c56a3..077c42dd 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/benchmarks/Cargo.toml
@@ -32,6 +32,7 @@ workspace = true
[dependencies]
bytes = { workspace = true }
+common = { workspace = true }
metric_engine = { workspace = true }
pb_types = { workspace = true }
prost = { workspace = true }
diff --git a/src/benchmarks/src/config.rs b/src/benchmarks/src/config.rs
index 4da29396..27577f25 100644
--- a/src/benchmarks/src/config.rs
+++ b/src/benchmarks/src/config.rs
@@ -19,10 +19,9 @@
use std::{env, fs};
+use common::ReadableDuration;
use serde::Deserialize;
use tracing::info;
-
-use crate::util::ReadableDuration;
const BENCH_CONFIG_PATH_KEY: &str = "BENCH_CONFIG_PATH";
#[derive(Debug, Deserialize)]
diff --git a/src/server/Cargo.toml b/src/common/Cargo.toml
similarity index 78%
copy from src/server/Cargo.toml
copy to src/common/Cargo.toml
index 03ad7929..f8df372e 100644
--- a/src/server/Cargo.toml
+++ b/src/common/Cargo.toml
@@ -16,7 +16,7 @@
# under the License.
[package]
-name = "server"
+name = "common"
[package.license]
workspace = true
@@ -31,11 +31,5 @@ workspace = true
workspace = true
[dependencies]
-actix-web = "4"
-arrow = { workspace = true }
-futures = { workspace = true }
-metric_engine = { workspace = true }
-object_store = { workspace = true }
-tokio = { workspace = true }
-tracing = { workspace = true }
-tracing-subscriber = { workspace = true }
+serde = { workspace = true }
+toml = { workspace = true }
diff --git a/src/metric_engine/src/util.rs b/src/common/src/lib.rs
similarity index 78%
rename from src/metric_engine/src/util.rs
rename to src/common/src/lib.rs
index f48e8834..eb405e27 100644
--- a/src/metric_engine/src/util.rs
+++ b/src/common/src/lib.rs
@@ -15,11 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use std::time::{SystemTime, UNIX_EPOCH};
+mod size_ext;
+mod time_ext;
-/// Current time in milliseconds.
-pub fn now() -> i64 {
- let now = SystemTime::now();
- let duration = now.duration_since(UNIX_EPOCH).unwrap();
- duration.as_millis() as i64
-}
+pub use size_ext::ReadableSize;
+pub use time_ext::{now, ReadableDuration};
diff --git a/src/common/src/size_ext.rs b/src/common/src/size_ext.rs
new file mode 100644
index 00000000..03c65e86
--- /dev/null
+++ b/src/common/src/size_ext.rs
@@ -0,0 +1,295 @@
+// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.
+
+// This module is forked from tikv and remove unnecessary code.
+//
https://github.com/tikv/tikv/blob/7ef4aa9549f136cabdac7e5177f39fe16631b514/components/tikv_util/src/config.rs#L76
+use std::{
+ fmt::{self, Write},
+ ops::{Div, Mul},
+ str::{self, FromStr},
+};
+
+use serde::{
+ de::{self, Unexpected, Visitor},
+ Deserialize, Deserializer, Serialize, Serializer,
+};
+
+const UNIT: u64 = 1;
+
+const BINARY_DATA_MAGNITUDE: u64 = 1024;
+pub const B: u64 = UNIT;
+pub const KIB: u64 = B * BINARY_DATA_MAGNITUDE;
+pub const MIB: u64 = KIB * BINARY_DATA_MAGNITUDE;
+pub const GIB: u64 = MIB * BINARY_DATA_MAGNITUDE;
+pub const TIB: u64 = GIB * BINARY_DATA_MAGNITUDE;
+pub const PIB: u64 = TIB * BINARY_DATA_MAGNITUDE;
+
+#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd)]
+pub struct ReadableSize(pub u64);
+
+impl ReadableSize {
+ pub const fn kb(count: u64) -> ReadableSize {
+ ReadableSize(count * KIB)
+ }
+
+ pub const fn mb(count: u64) -> ReadableSize {
+ ReadableSize(count * MIB)
+ }
+
+ pub const fn gb(count: u64) -> ReadableSize {
+ ReadableSize(count * GIB)
+ }
+
+ pub const fn as_mb(self) -> u64 {
+ self.0 / MIB
+ }
+
+ pub const fn as_byte(self) -> u64 {
+ self.0
+ }
+}
+
+impl Div<u64> for ReadableSize {
+ type Output = ReadableSize;
+
+ fn div(self, rhs: u64) -> ReadableSize {
+ ReadableSize(self.0 / rhs)
+ }
+}
+
+impl Div<ReadableSize> for ReadableSize {
+ type Output = u64;
+
+ fn div(self, rhs: ReadableSize) -> u64 {
+ self.0 / rhs.0
+ }
+}
+
+impl Mul<u64> for ReadableSize {
+ type Output = ReadableSize;
+
+ fn mul(self, rhs: u64) -> ReadableSize {
+ ReadableSize(self.0 * rhs)
+ }
+}
+
+impl Serialize for ReadableSize {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let size = self.0;
+ let mut buffer = String::new();
+ if size == 0 {
+ write!(buffer, "{size}KiB").unwrap();
+ } else if size % PIB == 0 {
+ write!(buffer, "{}PiB", size / PIB).unwrap();
+ } else if size % TIB == 0 {
+ write!(buffer, "{}TiB", size / TIB).unwrap();
+ } else if size % GIB == 0 {
+ write!(buffer, "{}GiB", size / GIB).unwrap();
+ } else if size % MIB == 0 {
+ write!(buffer, "{}MiB", size / MIB).unwrap();
+ } else if size % KIB == 0 {
+ write!(buffer, "{}KiB", size / KIB).unwrap();
+ } else {
+ return serializer.serialize_u64(size);
+ }
+ serializer.serialize_str(&buffer)
+ }
+}
+
+impl FromStr for ReadableSize {
+ type Err = String;
+
+ // This method parses value in binary unit.
+ fn from_str(s: &str) -> Result<ReadableSize, String> {
+ let size_str = s.trim();
+ if size_str.is_empty() {
+ return Err(format!("{s:?} is not a valid size."));
+ }
+
+ if !size_str.is_ascii() {
+ return Err(format!("ASCII string is expected, but got {s:?}"));
+ }
+
+ // size: digits and '.' as decimal separator
+ let size_len = size_str
+ .to_string()
+ .chars()
+ .take_while(|c| char::is_ascii_digit(c) || ['.', 'e', 'E', '-',
'+'].contains(c))
+ .count();
+
+ // unit: alphabetic characters
+ let (size, unit) = size_str.split_at(size_len);
+
+ let unit = match unit.trim() {
+ "K" | "KB" | "KiB" => KIB,
+ "M" | "MB" | "MiB" => MIB,
+ "G" | "GB" | "GiB" => GIB,
+ "T" | "TB" | "TiB" => TIB,
+ "P" | "PB" | "PiB" => PIB,
+ "B" | "" => UNIT,
+ _ => {
+ return Err(format!(
+ "only B, KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, and PiB
are supported: {s:?}"
+ ));
+ }
+ };
+
+ match size.parse::<f64>() {
+ Ok(n) => Ok(ReadableSize((n * unit as f64) as u64)),
+ Err(_) => Err(format!("invalid size string: {s:?}")),
+ }
+ }
+}
+
+impl<'de> Deserialize<'de> for ReadableSize {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ struct SizeVisitor;
+
+ impl Visitor<'_> for SizeVisitor {
+ type Value = ReadableSize;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ formatter.write_str("valid size")
+ }
+
+ fn visit_i64<E>(self, size: i64) -> Result<ReadableSize, E>
+ where
+ E: de::Error,
+ {
+ if size >= 0 {
+ self.visit_u64(size as u64)
+ } else {
+ Err(E::invalid_value(Unexpected::Signed(size), &self))
+ }
+ }
+
+ fn visit_u64<E>(self, size: u64) -> Result<ReadableSize, E>
+ where
+ E: de::Error,
+ {
+ Ok(ReadableSize(size))
+ }
+
+ fn visit_str<E>(self, size_str: &str) -> Result<ReadableSize, E>
+ where
+ E: de::Error,
+ {
+ size_str.parse().map_err(E::custom)
+ }
+ }
+
+ deserializer.deserialize_any(SizeVisitor)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_readable_size() {
+ let s = ReadableSize::kb(2);
+ assert_eq!(s.0, 2048);
+ assert_eq!(s.as_mb(), 0);
+ let s = ReadableSize::mb(2);
+ assert_eq!(s.0, 2 * 1024 * 1024);
+ assert_eq!(s.as_mb(), 2);
+ let s = ReadableSize::gb(2);
+ assert_eq!(s.0, 2 * 1024 * 1024 * 1024);
+ assert_eq!(s.as_mb(), 2048);
+
+ assert_eq!((ReadableSize::mb(2) / 2).0, MIB);
+ assert_eq!((ReadableSize::mb(1) / 2).0, 512 * KIB);
+ assert_eq!(ReadableSize::mb(2) / ReadableSize::kb(1), 2048);
+ }
+
+ #[test]
+ fn test_parse_readable_size() {
+ #[derive(Serialize, Deserialize)]
+ struct SizeHolder {
+ s: ReadableSize,
+ }
+
+ let legal_cases = vec![
+ (0, "0KiB"),
+ (2 * KIB, "2KiB"),
+ (4 * MIB, "4MiB"),
+ (5 * GIB, "5GiB"),
+ (7 * TIB, "7TiB"),
+ (11 * PIB, "11PiB"),
+ ];
+ for (size, exp) in legal_cases {
+ let c = SizeHolder {
+ s: ReadableSize(size),
+ };
+ let res_str = toml::to_string(&c).unwrap();
+ let exp_str = format!("s = {exp:?}\n");
+ assert_eq!(res_str, exp_str);
+ let res_size: SizeHolder = toml::from_str(&exp_str).unwrap();
+ assert_eq!(res_size.s.0, size);
+ }
+
+ let c = SizeHolder {
+ s: ReadableSize(512),
+ };
+ let res_str = toml::to_string(&c).unwrap();
+ assert_eq!(res_str, "s = 512\n");
+ let res_size: SizeHolder = toml::from_str(&res_str).unwrap();
+ assert_eq!(res_size.s.0, c.s.0);
+
+ let decode_cases = vec![
+ (" 0.5 PB", PIB / 2),
+ ("0.5 TB", TIB / 2),
+ ("0.5GB ", GIB / 2),
+ ("0.5MB", MIB / 2),
+ ("0.5KB", KIB / 2),
+ ("0.5P", PIB / 2),
+ ("0.5T", TIB / 2),
+ ("0.5G", GIB / 2),
+ ("0.5M", MIB / 2),
+ ("0.5K", KIB / 2),
+ ("23", 23),
+ ("1", 1),
+ ("1024B", KIB),
+ // units with binary prefixes
+ (" 0.5 PiB", PIB / 2),
+ ("1PiB", PIB),
+ ("0.5 TiB", TIB / 2),
+ ("2 TiB", TIB * 2),
+ ("0.5GiB ", GIB / 2),
+ ("787GiB ", GIB * 787),
+ ("0.5MiB", MIB / 2),
+ ("3MiB", MIB * 3),
+ ("0.5KiB", KIB / 2),
+ ("1 KiB", KIB),
+ // scientific notation
+ ("0.5e6 B", B * 500000),
+ ("0.5E6 B", B * 500000),
+ ("1e6B", B * 1000000),
+ ("8E6B", B * 8000000),
+ ("8e7", B * 80000000),
+ ("1e-1MB", MIB / 10),
+ ("1e+1MB", MIB * 10),
+ ("0e+10MB", 0),
+ ];
+ for (src, exp) in decode_cases {
+ let src = format!("s = {src:?}");
+ let res: SizeHolder = toml::from_str(&src).unwrap();
+ assert_eq!(res.s.0, exp);
+ }
+
+ let illegal_cases = vec![
+ "0.5kb", "0.5kB", "0.5Kb", "0.5k", "0.5g", "b", "gb", "1b", "B",
"1K24B", " 5_KB",
+ "4B7", "5M_",
+ ];
+ for src in illegal_cases {
+ let src_str = format!("s = {src:?}");
+ assert!(toml::from_str::<SizeHolder>(&src_str).is_err(), "{}",
src);
+ }
+ }
+}
diff --git a/src/common/src/time_ext.rs b/src/common/src/time_ext.rs
new file mode 100644
index 00000000..87b61d84
--- /dev/null
+++ b/src/common/src/time_ext.rs
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ fmt::{self, Write},
+ str::FromStr,
+ time::{Duration, SystemTime, UNIX_EPOCH},
+};
+
+use serde::{
+ de::{self, Visitor},
+ Deserialize, Deserializer, Serialize, Serializer,
+};
+
+const TIME_MAGNITUDE_1: u64 = 1000;
+const TIME_MAGNITUDE_2: u64 = 60;
+const TIME_MAGNITUDE_3: u64 = 24;
+const UNIT: u64 = 1;
+const MS: u64 = UNIT;
+const SECOND: u64 = MS * TIME_MAGNITUDE_1;
+const MINUTE: u64 = SECOND * TIME_MAGNITUDE_2;
+const HOUR: u64 = MINUTE * TIME_MAGNITUDE_2;
+const DAY: u64 = HOUR * TIME_MAGNITUDE_3;
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Default)]
+pub struct ReadableDuration(pub Duration);
+
+impl ReadableDuration {
+ pub const fn secs(secs: u64) -> ReadableDuration {
+ ReadableDuration(Duration::from_secs(secs))
+ }
+
+ pub const fn millis(millis: u64) -> ReadableDuration {
+ ReadableDuration(Duration::from_millis(millis))
+ }
+
+ pub const fn minutes(minutes: u64) -> ReadableDuration {
+ ReadableDuration::secs(minutes * 60)
+ }
+
+ pub const fn hours(hours: u64) -> ReadableDuration {
+ ReadableDuration::minutes(hours * 60)
+ }
+
+ pub const fn days(days: u64) -> ReadableDuration {
+ ReadableDuration::hours(days * 24)
+ }
+
+ pub fn as_secs(&self) -> u64 {
+ self.0.as_secs()
+ }
+
+ pub fn as_millis(&self) -> u64 {
+ self.0.as_millis() as u64
+ }
+
+ pub fn is_zero(&self) -> bool {
+ self.0.as_nanos() == 0
+ }
+}
+
+impl From<Duration> for ReadableDuration {
+ fn from(t: Duration) -> ReadableDuration {
+ ReadableDuration(t)
+ }
+}
+
+impl From<ReadableDuration> for Duration {
+ fn from(readable: ReadableDuration) -> Duration {
+ readable.0
+ }
+}
+
+impl FromStr for ReadableDuration {
+ type Err = String;
+
+ fn from_str(dur_str: &str) -> std::result::Result<ReadableDuration,
String> {
+ let dur_str = dur_str.trim();
+ if !dur_str.is_ascii() {
+ return Err(format!("unexpected ascii string: {dur_str}"));
+ }
+ let err_msg = "valid duration, only d, h, m, s, ms are
supported.".to_owned();
+ let mut left = dur_str.as_bytes();
+ let mut last_unit = DAY + 1;
+ let mut dur = 0f64;
+ while let Some(idx) = left.iter().position(|c| b"dhms".contains(c)) {
+ let (first, second) = left.split_at(idx);
+ let unit = if second.starts_with(b"ms") {
+ left = &left[idx + 2..];
+ MS
+ } else {
+ let u = match second[0] {
+ b'd' => DAY,
+ b'h' => HOUR,
+ b'm' => MINUTE,
+ b's' => SECOND,
+ _ => return Err(err_msg),
+ };
+ left = &left[idx + 1..];
+ u
+ };
+ if unit >= last_unit {
+ return Err("d, h, m, s, ms should occur in given
order.".to_owned());
+ }
+ // do we need to check 12h360m?
+ let number_str = unsafe { std::str::from_utf8_unchecked(first) };
+ dur += match number_str.trim().parse::<f64>() {
+ Ok(n) => n * unit as f64,
+ Err(_) => return Err(err_msg),
+ };
+ last_unit = unit;
+ }
+ if !left.is_empty() {
+ return Err(err_msg);
+ }
+ if dur.is_sign_negative() {
+ return Err("duration should be positive.".to_owned());
+ }
+ let secs = dur as u64 / SECOND;
+ let millis = (dur as u64 % SECOND) as u32 * 1_000_000;
+ Ok(ReadableDuration(Duration::new(secs, millis)))
+ }
+}
+
+impl fmt::Display for ReadableDuration {
+ #[inline]
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let mut dur = self.0.as_millis() as u64;
+ let mut written = false;
+ if dur >= DAY {
+ written = true;
+ write!(f, "{}d", dur / DAY)?;
+ dur %= DAY;
+ }
+ if dur >= HOUR {
+ written = true;
+ write!(f, "{}h", dur / HOUR)?;
+ dur %= HOUR;
+ }
+ if dur >= MINUTE {
+ written = true;
+ write!(f, "{}m", dur / MINUTE)?;
+ dur %= MINUTE;
+ }
+ if dur >= SECOND {
+ written = true;
+ write!(f, "{}s", dur / SECOND)?;
+ dur %= SECOND;
+ }
+ if dur > 0 {
+ written = true;
+ write!(f, "{dur}ms")?;
+ }
+ if !written {
+ write!(f, "0s")?;
+ }
+ Ok(())
+ }
+}
+
+impl Serialize for ReadableDuration {
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: Serializer,
+ {
+ let mut buffer = String::new();
+ write!(buffer, "{self}").unwrap();
+ serializer.serialize_str(&buffer)
+ }
+}
+
+impl<'de> Deserialize<'de> for ReadableDuration {
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ struct DurVisitor;
+
+ impl Visitor<'_> for DurVisitor {
+ type Value = ReadableDuration;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ formatter.write_str("valid duration")
+ }
+
+ fn visit_str<E>(self, dur_str: &str) ->
std::result::Result<ReadableDuration, E>
+ where
+ E: de::Error,
+ {
+ dur_str.parse().map_err(E::custom)
+ }
+ }
+
+ deserializer.deserialize_str(DurVisitor)
+ }
+}
+
+/// Current time in milliseconds.
+pub fn now() -> i64 {
+ let now = SystemTime::now();
+ let duration = now.duration_since(UNIX_EPOCH).unwrap();
+ duration.as_millis() as i64
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_duration_construction() {
+ let mut dur = ReadableDuration::secs(1);
+ assert_eq!(dur.0, Duration::new(1, 0));
+ assert_eq!(dur.as_secs(), 1);
+ assert_eq!(dur.as_millis(), 1000);
+ dur = ReadableDuration::millis(1001);
+ assert_eq!(dur.0, Duration::new(1, 1_000_000));
+ assert_eq!(dur.as_secs(), 1);
+ assert_eq!(dur.as_millis(), 1001);
+ dur = ReadableDuration::minutes(2);
+ assert_eq!(dur.0, Duration::new(2 * 60, 0));
+ assert_eq!(dur.as_secs(), 120);
+ assert_eq!(dur.as_millis(), 120000);
+ dur = ReadableDuration::hours(2);
+ assert_eq!(dur.0, Duration::new(2 * 3600, 0));
+ assert_eq!(dur.as_secs(), 7200);
+ assert_eq!(dur.as_millis(), 7200000);
+ }
+
+ #[test]
+ fn test_parse_readable_duration() {
+ #[derive(Serialize, Deserialize)]
+ struct DurHolder {
+ d: ReadableDuration,
+ }
+
+ let legal_cases = vec![
+ (0, 0, "0s"),
+ (0, 1, "1ms"),
+ (2, 0, "2s"),
+ (24 * 3600, 0, "1d"),
+ (2 * 24 * 3600, 10, "2d10ms"),
+ (4 * 60, 0, "4m"),
+ (5 * 3600, 0, "5h"),
+ (3600 + 2 * 60, 0, "1h2m"),
+ (5 * 24 * 3600 + 3600 + 2 * 60, 0, "5d1h2m"),
+ (3600 + 2, 5, "1h2s5ms"),
+ (3 * 24 * 3600 + 7 * 3600 + 2, 5, "3d7h2s5ms"),
+ ];
+ for (secs, ms, exp) in legal_cases {
+ let d = DurHolder {
+ d: ReadableDuration(Duration::new(secs, ms * 1_000_000)),
+ };
+ let res_str = toml::to_string(&d).unwrap();
+ let exp_str = format!("d = {exp:?}\n");
+ assert_eq!(res_str, exp_str);
+ let res_dur: DurHolder = toml::from_str(&exp_str).unwrap();
+ assert_eq!(res_dur.d.0, d.d.0);
+ }
+
+ let decode_cases = vec![(" 0.5 h2m ", 3600 / 2 + 2 * 60, 0)];
+ for (src, secs, ms) in decode_cases {
+ let src = format!("d = {src:?}");
+ let res: DurHolder = toml::from_str(&src).unwrap();
+ assert_eq!(res.d.0, Duration::new(secs, ms * 1_000_000));
+ }
+
+ let illegal_cases = vec!["1H", "1M", "1S", "1MS", "1h1h", "h"];
+ for src in illegal_cases {
+ let src_str = format!("d = {src:?}");
+ assert!(toml::from_str::<DurHolder>(&src_str).is_err(), "{}", src);
+ }
+ assert!(toml::from_str::<DurHolder>("d = 23").is_err());
+ }
+}
diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml
index ca8dc576..ba6d2daa 100644
--- a/src/metric_engine/Cargo.toml
+++ b/src/metric_engine/Cargo.toml
@@ -39,6 +39,7 @@ async-trait = { workspace = true }
byteorder = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
+common = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
diff --git a/src/metric_engine/src/compaction/picker.rs
b/src/metric_engine/src/compaction/picker.rs
index 9107e3ba..28e08688 100644
--- a/src/metric_engine/src/compaction/picker.rs
+++ b/src/metric_engine/src/compaction/picker.rs
@@ -17,9 +17,10 @@
use std::{collections::BTreeMap, time::Duration};
+use common::now;
use tracing::debug;
-use crate::{compaction::Task, manifest::ManifestRef, sst::SstFile,
types::Timestamp, util::now};
+use crate::{compaction::Task, manifest::ManifestRef, sst::SstFile,
types::Timestamp};
pub struct Picker {
manifest: ManifestRef,
diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs
index 3c2334de..2bbcdd6b 100644
--- a/src/metric_engine/src/lib.rs
+++ b/src/metric_engine/src/lib.rs
@@ -29,6 +29,5 @@ pub mod storage;
#[cfg(test)]
mod test_util;
pub mod types;
-pub(crate) mod util;
pub use error::{AnyhowError, Error, Result};
diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs
index e5b6acf0..72107abd 100644
--- a/src/metric_engine/src/storage.rs
+++ b/src/metric_engine/src/storage.rs
@@ -55,8 +55,8 @@ use crate::{
read::ParquetReader,
sst::{allocate_id, FileMeta, SstPathGenerator},
types::{
- ObjectStoreRef, RuntimeOptions, StorageOptions, StorageSchema,
TimeRange, WriteOptions,
- WriteResult, SEQ_COLUMN_NAME,
+ ObjectStoreRef, StorageOptions, StorageSchema, TimeRange,
WriteOptions, WriteResult,
+ SEQ_COLUMN_NAME,
},
Result,
};
@@ -95,31 +95,17 @@ pub trait TimeMergeStorage {
pub type TimeMergeStorageRef = Arc<(dyn TimeMergeStorage + Send + Sync)>;
#[derive(Clone)]
-struct StorageRuntimes {
+pub struct StorageRuntimes {
manifest_compact_runtime: Arc<Runtime>,
sst_compact_runtime: Arc<Runtime>,
}
impl StorageRuntimes {
- pub fn new(runtime_opts: RuntimeOptions) -> Result<Self> {
- let manifest_compact_runtime =
tokio::runtime::Builder::new_multi_thread()
- .thread_name("man-compact")
- .worker_threads(runtime_opts.manifest_compact_thread_num)
- .enable_all()
- .build()
- .context("build storgae compact runtime")?;
-
- let sst_compact_runtime = tokio::runtime::Builder::new_multi_thread()
- .thread_name("sst-compact")
- .worker_threads(runtime_opts.sst_compact_thread_num)
- .enable_all()
- .build()
- .context("build storgae compact runtime")?;
-
- Ok(Self {
- manifest_compact_runtime: Arc::new(manifest_compact_runtime),
- sst_compact_runtime: Arc::new(sst_compact_runtime),
- })
+ pub fn new(manifest_compact_runtime: Arc<Runtime>, sst_compact_runtime:
Arc<Runtime>) -> Self {
+ Self {
+ manifest_compact_runtime,
+ sst_compact_runtime,
+ }
}
}
@@ -154,25 +140,15 @@ pub struct CloudObjectStorage {
/// ```
/// `root_path` is composed of `path` and `segment_duration`.
impl CloudObjectStorage {
- pub fn try_new(
+ pub async fn try_new(
path: String,
segment_duration: Duration,
store: ObjectStoreRef,
arrow_schema: SchemaRef,
num_primary_keys: usize,
storage_opts: StorageOptions,
+ runtimes: StorageRuntimes,
) -> Result<Self> {
- let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
- let manifest = runtimes.manifest_compact_runtime.block_on(async {
- Manifest::try_new(
- path.clone(),
- store.clone(),
- runtimes.manifest_compact_runtime.clone(),
- storage_opts.manifest_merge_opts,
- )
- .await
- })?;
- let manifest = Arc::new(manifest);
let schema = {
let value_idxes =
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
ensure!(!value_idxes.is_empty(), "no value column found");
@@ -197,6 +173,14 @@ impl CloudObjectStorage {
update_mode,
}
};
+ let manifest = Manifest::try_new(
+ path.clone(),
+ store.clone(),
+ runtimes.manifest_compact_runtime.clone(),
+ storage_opts.manifest_merge_opts,
+ )
+ .await?;
+ let manifest = Arc::new(manifest);
let write_props = Self::build_write_props(storage_opts.write_opts,
num_primary_keys);
let sst_path_gen = Arc::new(SstPathGenerator::new(path.clone()));
let parquet_reader = Arc::new(ParquetReader::new(
@@ -431,22 +415,30 @@ mod tests {
use super::*;
use crate::{arrow_schema, record_batch, test_util::check_stream,
types::Timestamp};
+ fn build_runtimes() -> StorageRuntimes {
+ let rt = Arc::new(Runtime::new().unwrap());
+ StorageRuntimes::new(rt.clone(), rt)
+ }
+
#[test(test)]
fn test_storage_write_and_scan() {
let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value",
Int64));
let root_dir = temp_dir::TempDir::new().unwrap();
let store = Arc::new(LocalFileSystem::new());
- let storage = CloudObjectStorage::try_new(
- root_dir.path().to_string_lossy().to_string(),
- Duration::from_hours(2),
- store,
- schema.clone(),
- 2, // num_primary_keys
- StorageOptions::default(),
- )
- .unwrap();
+ let runtimes = build_runtimes();
+ runtimes.sst_compact_runtime.clone().block_on(async move {
+ let storage = CloudObjectStorage::try_new(
+ root_dir.path().to_string_lossy().to_string(),
+ Duration::from_hours(2),
+ store,
+ schema.clone(),
+ 2, // num_primary_keys
+ StorageOptions::default(),
+ runtimes,
+ )
+ .await
+ .unwrap();
- storage.runtimes.sst_compact_runtime.block_on(async {
let batch = record_batch!(
("pk1", UInt8, vec![11, 11, 9, 10, 5]),
("pk2", UInt8, vec![100, 100, 1, 2, 3]),
@@ -509,16 +501,19 @@ mod tests {
let schema = arrow_schema!(("a", UInt8), ("b", UInt8), ("c", UInt8),
("c", UInt8));
let root_dir = temp_dir::TempDir::new().unwrap();
let store = Arc::new(LocalFileSystem::new());
- let storage = CloudObjectStorage::try_new(
- root_dir.path().to_string_lossy().to_string(),
- Duration::from_hours(2),
- store,
- schema.clone(),
- 1,
- StorageOptions::default(),
- )
- .unwrap();
- storage.runtimes.sst_compact_runtime.block_on(async {
+ let runtimes = build_runtimes();
+ runtimes.sst_compact_runtime.clone().block_on(async move {
+ let storage = CloudObjectStorage::try_new(
+ root_dir.path().to_string_lossy().to_string(),
+ Duration::from_hours(2),
+ store,
+ schema.clone(),
+ 1,
+ StorageOptions::default(),
+ runtimes,
+ )
+ .await
+ .unwrap();
let batch = record_batch!(
("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]),
("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]),
diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs
index 03d75ed4..107a363a 100644
--- a/src/metric_engine/src/types.rs
+++ b/src/metric_engine/src/types.rs
@@ -164,21 +164,6 @@ impl Default for WriteOptions {
}
}
-#[derive(Debug)]
-pub struct RuntimeOptions {
- pub manifest_compact_thread_num: usize,
- pub sst_compact_thread_num: usize,
-}
-
-impl Default for RuntimeOptions {
- fn default() -> Self {
- Self {
- manifest_compact_thread_num: 2,
- sst_compact_thread_num: 4,
- }
- }
-}
-
#[derive(Debug)]
pub struct ManifestMergeOptions {
pub channel_size: usize,
@@ -211,7 +196,6 @@ pub enum UpdateMode {
pub struct StorageOptions {
pub write_opts: WriteOptions,
pub manifest_merge_opts: ManifestMergeOptions,
- pub runtime_opts: RuntimeOptions,
pub update_mode: UpdateMode,
}
diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml
index 03ad7929..19acaa23 100644
--- a/src/server/Cargo.toml
+++ b/src/server/Cargo.toml
@@ -33,9 +33,14 @@ workspace = true
[dependencies]
actix-web = "4"
arrow = { workspace = true }
+clap = { workspace = true, features = ["derive"] }
+common = { workspace = true }
futures = { workspace = true }
metric_engine = { workspace = true }
object_store = { workspace = true }
+rand = "0.8"
+serde = { workspace = true }
tokio = { workspace = true }
+toml = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
diff --git a/src/server/src/config.rs b/src/server/src/config.rs
new file mode 100644
index 00000000..41f39d25
--- /dev/null
+++ b/src/server/src/config.rs
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use common::ReadableDuration;
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(default)]
+pub struct Config {
+ pub port: u16,
+ pub write_worker_num: usize, // for test
+ pub metric_engine: MetricEngineConfig,
+}
+
+impl Default for Config {
+ fn default() -> Self {
+ Self {
+ port: 5000,
+ write_worker_num: 4,
+ metric_engine: MetricEngineConfig::default(),
+ }
+ }
+}
+
+#[derive(Default, Debug, Clone, Deserialize, Serialize)]
+#[serde(default)]
+pub struct MetricEngineConfig {
+ pub manifest: ManifestConfig,
+ pub sst: SstConfig,
+ pub storage: StorageConfig,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(default)]
+pub struct ManifestConfig {
+ pub background_thread_num: usize,
+}
+
+impl Default for ManifestConfig {
+ fn default() -> Self {
+ Self {
+ background_thread_num: 2,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(default)]
+pub struct SstConfig {
+ pub background_thread_num: usize,
+}
+
+impl Default for SstConfig {
+ fn default() -> Self {
+ Self {
+ background_thread_num: 2,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(tag = "type")]
+#[allow(clippy::large_enum_variant)]
+pub enum StorageConfig {
+ Local(LocalStorageConfig),
+ S3Like(S3LikeStorageConfig),
+}
+
+impl Default for StorageConfig {
+ fn default() -> Self {
+ Self::Local(LocalStorageConfig::default())
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct LocalStorageConfig {
+ pub data_dir: String,
+}
+
+impl Default for LocalStorageConfig {
+ fn default() -> Self {
+ Self {
+ data_dir: "/tmp/horaedb".to_string(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct S3LikeStorageConfig {
+ pub region: String,
+ pub key_id: String,
+ pub key_secret: String,
+ pub endpoint: String,
+ pub bucket: String,
+ pub prefix: String,
+ #[serde(default = "default_max_retries")]
+ pub max_retries: usize,
+ #[serde(default)]
+ pub http: HttpOptions,
+ #[serde(default)]
+ pub timeout: TimeoutOptions,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct HttpOptions {
+ pub pool_max_idle_per_host: usize,
+ pub timeout: ReadableDuration,
+ pub keep_alive_timeout: ReadableDuration,
+ pub keep_alive_interval: ReadableDuration,
+}
+
+impl Default for HttpOptions {
+ fn default() -> Self {
+ Self {
+ pool_max_idle_per_host: 1024,
+ timeout: ReadableDuration::secs(15),
+ keep_alive_timeout: ReadableDuration::secs(10),
+ keep_alive_interval: ReadableDuration::secs(2),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct TimeoutOptions {
+ // Non IO Operation like stat and delete, they operate on a single file,
we control them by
+ // setting timeout.
+ pub timeout: ReadableDuration,
+ // IO Operation like read and write, they operate on data directly, we
control them by setting
+ // io_timeout.
+ pub io_timeout: ReadableDuration,
+}
+
+impl Default for TimeoutOptions {
+ fn default() -> Self {
+ Self {
+ timeout: ReadableDuration::secs(10),
+ io_timeout: ReadableDuration::secs(10),
+ }
+ }
+}
+
+#[inline]
+fn default_max_retries() -> usize {
+ 3
+}
diff --git a/src/server/src/main.rs b/src/server/src/main.rs
index c02bc939..0644f5a5 100644
--- a/src/server/src/main.rs
+++ b/src/server/src/main.rs
@@ -16,20 +16,36 @@
// under the License.
#![feature(duration_constructors)]
-use std::{sync::Arc, time::Duration};
+mod config;
+use std::{fs, iter::repeat_with, sync::Arc, time::Duration};
use actix_web::{
get,
web::{self, Data},
App, HttpResponse, HttpServer, Responder,
};
-use arrow::datatypes::{DataType, Field, Schema};
+use arrow::{
+ array::{Int64Array, RecordBatch},
+ datatypes::{DataType, Field, Schema, SchemaRef},
+};
+use clap::Parser;
+use config::{Config, StorageConfig};
use metric_engine::{
- storage::{CloudObjectStorage, CompactRequest, TimeMergeStorageRef},
- types::StorageOptions,
+ storage::{
+ CloudObjectStorage, CompactRequest, StorageRuntimes,
TimeMergeStorageRef, WriteRequest,
+ },
+ types::{RuntimeRef, StorageOptions},
};
use object_store::local::LocalFileSystem;
-use tracing::info;
+use tracing::{error, info};
+
+#[derive(Parser, Debug)]
+#[command(version, about, long_about)]
+struct Args {
+ /// Config file path
+ #[arg(short, long)]
+ config: String,
+}
#[get("/")]
async fn hello() -> impl Responder {
@@ -48,38 +64,119 @@ struct AppState {
storage: TimeMergeStorageRef,
}
-#[actix_web::main]
-async fn main() -> std::io::Result<()> {
+pub fn main() {
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt::init();
- let port = 5000;
+ let args = Args::parse();
+ let config_body = fs::read_to_string(args.config).expect("read config file
failed");
+ let config: Config = toml::from_str(&config_body).unwrap();
+ info!(config = ?config, "Config loaded");
+
+ let port = config.port;
+ let rt = build_multi_runtime("main", 1);
+ let manifest_compact_runtime = build_multi_runtime(
+ "manifest-compact",
+ config.metric_engine.manifest.background_thread_num,
+ );
+ let sst_compact_runtime = build_multi_runtime(
+ "sst-compact",
+ config.metric_engine.sst.background_thread_num,
+ );
+ let runtimes = StorageRuntimes::new(manifest_compact_runtime,
sst_compact_runtime);
+ let storage_config = match config.metric_engine.storage {
+ StorageConfig::Local(v) => v,
+ StorageConfig::S3Like(_) => panic!("S3 not support yet"),
+ };
+ let write_worker_num = config.write_worker_num;
+ let write_rt = build_multi_runtime("write", write_worker_num);
+ let _ = rt.block_on(async move {
+ let store = Arc::new(LocalFileSystem::new());
+ let storage = Arc::new(
+ CloudObjectStorage::try_new(
+ storage_config.data_dir,
+ Duration::from_mins(10),
+ store,
+ build_schema(),
+ 3,
+ StorageOptions::default(),
+ runtimes,
+ )
+ .await
+ .unwrap(),
+ );
+
+ bench_write(write_rt.clone(), write_worker_num, storage.clone());
+
+ let app_state = Data::new(AppState { storage });
+ info!(port, "Start HoraeDB http server...");
+ HttpServer::new(move || {
+ App::new()
+ .app_data(app_state.clone())
+ .service(hello)
+ .service(compact)
+ })
+ .workers(4)
+ .bind(("127.0.0.1", port))
+ .expect("Server bind failed")
+ .run()
+ .await
+ });
+}
+
+fn build_multi_runtime(name: &str, workers: usize) -> RuntimeRef {
+ let rt = tokio::runtime::Builder::new_multi_thread()
+ .thread_name(name)
+ .worker_threads(workers)
+ .enable_all()
+ .build()
+ .expect("build tokio runtime");
+
+ Arc::new(rt)
+}
+
+fn build_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("pk1", DataType::Int64, true),
+ Field::new("pk2", DataType::Int64, true),
+ Field::new("pk3", DataType::Int64, true),
+ Field::new("value", DataType::Int64, true),
+ ]))
+}
+
+fn bench_write(rt: RuntimeRef, workers: usize, storage: TimeMergeStorageRef) {
let schema = Arc::new(Schema::new(vec![
Field::new("pk1", DataType::Int64, true),
Field::new("pk2", DataType::Int64, true),
+ Field::new("pk3", DataType::Int64, true),
Field::new("value", DataType::Int64, true),
]));
- let store = Arc::new(LocalFileSystem::new());
- let storage = Arc::new(
- CloudObjectStorage::try_new(
- "/tmp/test".to_string(),
- Duration::from_mins(10),
- store,
- schema,
- 2,
- StorageOptions::default(),
- )
- .unwrap(),
- );
- let app_state = Data::new(AppState { storage });
- info!(port, "Start HoraeDB http server...");
- HttpServer::new(move || {
- App::new()
- .app_data(app_state.clone())
- .service(hello)
- .service(compact)
- })
- .bind(("127.0.0.1", port))?
- .run()
- .await
+ for _ in 0..workers {
+ let storage = storage.clone();
+ let schema = schema.clone();
+ rt.spawn(async move {
+ loop {
+ let pk1: Int64Array =
repeat_with(rand::random::<i64>).take(1000).collect();
+ let pk2: Int64Array =
repeat_with(rand::random::<i64>).take(1000).collect();
+ let pk3: Int64Array =
repeat_with(rand::random::<i64>).take(1000).collect();
+ let value: Int64Array =
repeat_with(rand::random::<i64>).take(1000).collect();
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(pk1), Arc::new(pk2), Arc::new(pk3),
Arc::new(value)],
+ )
+ .unwrap();
+ let now = common::now();
+ if let Err(e) = storage
+ .write(WriteRequest {
+ batch,
+ enable_check: false,
+ time_range: (now..now + 1).into(),
+ })
+ .await
+ {
+ error!("write failed, err:{}", e);
+ }
+ }
+ });
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]