This is an automated email from the ASF dual-hosted git repository.

baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new d2f878f1 feat: add server config (#1615)
d2f878f1 is described below

commit d2f878f1b718e0b16fe70c8983a3d919c1d8d5a1
Author: Jiacai Liu <[email protected]>
AuthorDate: Fri Dec 20 10:17:18 2024 +0800

    feat: add server config (#1615)
    
    ## Rationale
    
    
    ## Detailed Changes
    - Server can read config from cli args
    - Start 4 write worker to bench write.
    
    ## Test Plan
    CI
---
 Cargo.lock                                         |  36 +++
 Cargo.toml                                         |  10 +-
 src/server/Cargo.toml => docs/example.toml         |  27 +-
 licenserc.toml                                     |  26 +-
 src/benchmarks/Cargo.toml                          |   1 +
 src/benchmarks/src/config.rs                       |   3 +-
 src/{server => common}/Cargo.toml                  |  12 +-
 .../src/util.rs => common/src/lib.rs}              |  11 +-
 src/common/src/size_ext.rs                         | 295 +++++++++++++++++++++
 src/common/src/time_ext.rs                         | 288 ++++++++++++++++++++
 src/metric_engine/Cargo.toml                       |   1 +
 src/metric_engine/src/compaction/picker.rs         |   3 +-
 src/metric_engine/src/lib.rs                       |   1 -
 src/metric_engine/src/storage.rs                   | 103 ++++---
 src/metric_engine/src/types.rs                     |  16 --
 src/server/Cargo.toml                              |   5 +
 src/server/src/config.rs                           | 159 +++++++++++
 src/server/src/main.rs                             | 159 ++++++++---
 18 files changed, 988 insertions(+), 168 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index ac89a9fb..b5c4a8d2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -630,6 +630,7 @@ name = "benchmarks"
 version = "2.2.0-alpha"
 dependencies = [
  "bytes",
+ "common",
  "criterion",
  "metric_engine",
  "pb_types",
@@ -859,6 +860,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84"
 dependencies = [
  "clap_builder",
+ "clap_derive",
 ]
 
 [[package]]
@@ -867,8 +869,22 @@ version = "4.5.23"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838"
 dependencies = [
+ "anstream",
  "anstyle",
  "clap_lex",
+ "strsim",
+]
+
+[[package]]
+name = "clap_derive"
+version = "4.5.18"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab"
+dependencies = [
+ "heck",
+ "proc-macro2",
+ "quote",
+ "syn",
 ]
 
 [[package]]
@@ -894,6 +910,14 @@ dependencies = [
  "unicode-width",
 ]
 
+[[package]]
+name = "common"
+version = "2.2.0-alpha"
+dependencies = [
+ "serde",
+ "toml",
+]
+
 [[package]]
 name = "const-random"
 version = "0.1.18"
@@ -2141,6 +2165,7 @@ dependencies = [
  "byteorder",
  "bytes",
  "bytesize",
+ "common",
  "datafusion",
  "futures",
  "itertools 0.3.25",
@@ -2881,10 +2906,15 @@ version = "2.2.0-alpha"
 dependencies = [
  "actix-web",
  "arrow",
+ "clap",
+ "common",
  "futures",
  "metric_engine",
  "object_store",
+ "rand",
+ "serde",
  "tokio",
+ "toml",
  "tracing",
  "tracing-subscriber",
 ]
@@ -3020,6 +3050,12 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
 
+[[package]]
+name = "strsim"
+version = "0.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
+
 [[package]]
 name = "strum"
 version = "0.26.3"
diff --git a/Cargo.toml b/Cargo.toml
index d81bca03..9f5df523 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,11 +23,18 @@ license = "Apache-2.0"
 
 [workspace]
 resolver = "2"
-members = ["src/benchmarks", "src/metric_engine", "src/pb_types", "src/server"]
+members = [
+    "src/benchmarks",
+    "src/common",
+    "src/metric_engine",
+    "src/pb_types",
+    "src/server"
+]
 
 [workspace.dependencies]
 anyhow = { version = "1.0" }
 metric_engine = { path = "src/metric_engine" }
+common = { path = "src/common" }
 thiserror = "1"
 bytes = "1"
 byteorder = "1"
@@ -38,6 +45,7 @@ pb_types = { path = "src/pb_types" }
 prost = { version = "0.13" }
 arrow = { version = "53", features = ["prettyprint"] }
 bytesize = "1"
+clap = "4"
 arrow-schema = "53"
 tokio = { version = "1", features = ["full"] }
 async-trait = "0.1"
diff --git a/src/server/Cargo.toml b/docs/example.toml
similarity index 64%
copy from src/server/Cargo.toml
copy to docs/example.toml
index 03ad7929..0fbb0220 100644
--- a/src/server/Cargo.toml
+++ b/docs/example.toml
@@ -15,27 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[package]
-name = "server"
+port = 5000
 
-[package.license]
-workspace = true
+[metric_engine]
 
-[package.version]
-workspace = true
-
-[package.authors]
-workspace = true
-
-[package.edition]
-workspace = true
-
-[dependencies]
-actix-web = "4"
-arrow = { workspace = true }
-futures = { workspace = true }
-metric_engine = { workspace = true }
-object_store = { workspace = true }
-tokio = { workspace = true }
-tracing = { workspace = true }
-tracing-subscriber = { workspace = true }
+[metric_engine.storage]
+type = "Local"
+data_dir = "/tmp/horaedb-storage"
\ No newline at end of file
diff --git a/licenserc.toml b/licenserc.toml
index 9bace81e..9bc60b63 100644
--- a/licenserc.toml
+++ b/licenserc.toml
@@ -18,28 +18,6 @@
 headerPath = "Apache-2.0-ASF.txt"
 
 excludes = [
-    # Derived
-    
"horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go",
-    
"horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go",
-    "horaemeta/server/service/http/route.go",
-    "src/server/src/federated.rs",
-    "src/server/src/session.rs",
-    "src/components/skiplist/benches/bench.rs",
-    "src/components/skiplist/src/lib.rs",
-    "src/components/skiplist/src/key.rs",
-    "src/components/skiplist/src/list.rs",
-    "src/components/skiplist/tests/tests.rs",
-    "src/components/skiplist/Cargo.toml",
-    "src/components/size_ext/src/lib.rs",
-    "src/components/future_ext/src/cancel.rs",
-    "src/components/tracing_util/src/lib.rs",
-    "src/components/tracing_util/src/logging.rs",
-    "src/components/tracing_util/Cargo.toml",
-    "DISCLAIMER",
-    "NOTICE",
-    "horaemeta/DEPENDENCIES.csv",
-    "DEPENDENCIES.tsv",
-    # Test files
-    "*snap",
-    "*result"
+    # Forked
+    "src/common/src/size_ext.rs",
 ]
diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml
index 788c56a3..077c42dd 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/benchmarks/Cargo.toml
@@ -32,6 +32,7 @@ workspace = true
 
 [dependencies]
 bytes = { workspace = true }
+common = { workspace = true }
 metric_engine = { workspace = true }
 pb_types = { workspace = true }
 prost = { workspace = true }
diff --git a/src/benchmarks/src/config.rs b/src/benchmarks/src/config.rs
index 4da29396..27577f25 100644
--- a/src/benchmarks/src/config.rs
+++ b/src/benchmarks/src/config.rs
@@ -19,10 +19,9 @@
 
 use std::{env, fs};
 
+use common::ReadableDuration;
 use serde::Deserialize;
 use tracing::info;
-
-use crate::util::ReadableDuration;
 const BENCH_CONFIG_PATH_KEY: &str = "BENCH_CONFIG_PATH";
 
 #[derive(Debug, Deserialize)]
diff --git a/src/server/Cargo.toml b/src/common/Cargo.toml
similarity index 78%
copy from src/server/Cargo.toml
copy to src/common/Cargo.toml
index 03ad7929..f8df372e 100644
--- a/src/server/Cargo.toml
+++ b/src/common/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [package]
-name = "server"
+name = "common"
 
 [package.license]
 workspace = true
@@ -31,11 +31,5 @@ workspace = true
 workspace = true
 
 [dependencies]
-actix-web = "4"
-arrow = { workspace = true }
-futures = { workspace = true }
-metric_engine = { workspace = true }
-object_store = { workspace = true }
-tokio = { workspace = true }
-tracing = { workspace = true }
-tracing-subscriber = { workspace = true }
+serde = { workspace = true }
+toml = { workspace = true }
diff --git a/src/metric_engine/src/util.rs b/src/common/src/lib.rs
similarity index 78%
rename from src/metric_engine/src/util.rs
rename to src/common/src/lib.rs
index f48e8834..eb405e27 100644
--- a/src/metric_engine/src/util.rs
+++ b/src/common/src/lib.rs
@@ -15,11 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::time::{SystemTime, UNIX_EPOCH};
+mod size_ext;
+mod time_ext;
 
-/// Current time in milliseconds.
-pub fn now() -> i64 {
-    let now = SystemTime::now();
-    let duration = now.duration_since(UNIX_EPOCH).unwrap();
-    duration.as_millis() as i64
-}
+pub use size_ext::ReadableSize;
+pub use time_ext::{now, ReadableDuration};
diff --git a/src/common/src/size_ext.rs b/src/common/src/size_ext.rs
new file mode 100644
index 00000000..03c65e86
--- /dev/null
+++ b/src/common/src/size_ext.rs
@@ -0,0 +1,295 @@
+// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.
+
+// This module is forked from tikv and remove unnecessary code.
+// 
https://github.com/tikv/tikv/blob/7ef4aa9549f136cabdac7e5177f39fe16631b514/components/tikv_util/src/config.rs#L76
+use std::{
+    fmt::{self, Write},
+    ops::{Div, Mul},
+    str::{self, FromStr},
+};
+
+use serde::{
+    de::{self, Unexpected, Visitor},
+    Deserialize, Deserializer, Serialize, Serializer,
+};
+
+const UNIT: u64 = 1;
+
+const BINARY_DATA_MAGNITUDE: u64 = 1024;
+pub const B: u64 = UNIT;
+pub const KIB: u64 = B * BINARY_DATA_MAGNITUDE;
+pub const MIB: u64 = KIB * BINARY_DATA_MAGNITUDE;
+pub const GIB: u64 = MIB * BINARY_DATA_MAGNITUDE;
+pub const TIB: u64 = GIB * BINARY_DATA_MAGNITUDE;
+pub const PIB: u64 = TIB * BINARY_DATA_MAGNITUDE;
+
+#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd)]
+pub struct ReadableSize(pub u64);
+
+impl ReadableSize {
+    pub const fn kb(count: u64) -> ReadableSize {
+        ReadableSize(count * KIB)
+    }
+
+    pub const fn mb(count: u64) -> ReadableSize {
+        ReadableSize(count * MIB)
+    }
+
+    pub const fn gb(count: u64) -> ReadableSize {
+        ReadableSize(count * GIB)
+    }
+
+    pub const fn as_mb(self) -> u64 {
+        self.0 / MIB
+    }
+
+    pub const fn as_byte(self) -> u64 {
+        self.0
+    }
+}
+
+impl Div<u64> for ReadableSize {
+    type Output = ReadableSize;
+
+    fn div(self, rhs: u64) -> ReadableSize {
+        ReadableSize(self.0 / rhs)
+    }
+}
+
+impl Div<ReadableSize> for ReadableSize {
+    type Output = u64;
+
+    fn div(self, rhs: ReadableSize) -> u64 {
+        self.0 / rhs.0
+    }
+}
+
+impl Mul<u64> for ReadableSize {
+    type Output = ReadableSize;
+
+    fn mul(self, rhs: u64) -> ReadableSize {
+        ReadableSize(self.0 * rhs)
+    }
+}
+
+impl Serialize for ReadableSize {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        let size = self.0;
+        let mut buffer = String::new();
+        if size == 0 {
+            write!(buffer, "{size}KiB").unwrap();
+        } else if size % PIB == 0 {
+            write!(buffer, "{}PiB", size / PIB).unwrap();
+        } else if size % TIB == 0 {
+            write!(buffer, "{}TiB", size / TIB).unwrap();
+        } else if size % GIB == 0 {
+            write!(buffer, "{}GiB", size / GIB).unwrap();
+        } else if size % MIB == 0 {
+            write!(buffer, "{}MiB", size / MIB).unwrap();
+        } else if size % KIB == 0 {
+            write!(buffer, "{}KiB", size / KIB).unwrap();
+        } else {
+            return serializer.serialize_u64(size);
+        }
+        serializer.serialize_str(&buffer)
+    }
+}
+
+impl FromStr for ReadableSize {
+    type Err = String;
+
+    // This method parses value in binary unit.
+    fn from_str(s: &str) -> Result<ReadableSize, String> {
+        let size_str = s.trim();
+        if size_str.is_empty() {
+            return Err(format!("{s:?} is not a valid size."));
+        }
+
+        if !size_str.is_ascii() {
+            return Err(format!("ASCII string is expected, but got {s:?}"));
+        }
+
+        // size: digits and '.' as decimal separator
+        let size_len = size_str
+            .to_string()
+            .chars()
+            .take_while(|c| char::is_ascii_digit(c) || ['.', 'e', 'E', '-', 
'+'].contains(c))
+            .count();
+
+        // unit: alphabetic characters
+        let (size, unit) = size_str.split_at(size_len);
+
+        let unit = match unit.trim() {
+            "K" | "KB" | "KiB" => KIB,
+            "M" | "MB" | "MiB" => MIB,
+            "G" | "GB" | "GiB" => GIB,
+            "T" | "TB" | "TiB" => TIB,
+            "P" | "PB" | "PiB" => PIB,
+            "B" | "" => UNIT,
+            _ => {
+                return Err(format!(
+                    "only B, KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, and PiB 
are supported: {s:?}"
+                ));
+            }
+        };
+
+        match size.parse::<f64>() {
+            Ok(n) => Ok(ReadableSize((n * unit as f64) as u64)),
+            Err(_) => Err(format!("invalid size string: {s:?}")),
+        }
+    }
+}
+
+impl<'de> Deserialize<'de> for ReadableSize {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        struct SizeVisitor;
+
+        impl Visitor<'_> for SizeVisitor {
+            type Value = ReadableSize;
+
+            fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> 
fmt::Result {
+                formatter.write_str("valid size")
+            }
+
+            fn visit_i64<E>(self, size: i64) -> Result<ReadableSize, E>
+            where
+                E: de::Error,
+            {
+                if size >= 0 {
+                    self.visit_u64(size as u64)
+                } else {
+                    Err(E::invalid_value(Unexpected::Signed(size), &self))
+                }
+            }
+
+            fn visit_u64<E>(self, size: u64) -> Result<ReadableSize, E>
+            where
+                E: de::Error,
+            {
+                Ok(ReadableSize(size))
+            }
+
+            fn visit_str<E>(self, size_str: &str) -> Result<ReadableSize, E>
+            where
+                E: de::Error,
+            {
+                size_str.parse().map_err(E::custom)
+            }
+        }
+
+        deserializer.deserialize_any(SizeVisitor)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_readable_size() {
+        let s = ReadableSize::kb(2);
+        assert_eq!(s.0, 2048);
+        assert_eq!(s.as_mb(), 0);
+        let s = ReadableSize::mb(2);
+        assert_eq!(s.0, 2 * 1024 * 1024);
+        assert_eq!(s.as_mb(), 2);
+        let s = ReadableSize::gb(2);
+        assert_eq!(s.0, 2 * 1024 * 1024 * 1024);
+        assert_eq!(s.as_mb(), 2048);
+
+        assert_eq!((ReadableSize::mb(2) / 2).0, MIB);
+        assert_eq!((ReadableSize::mb(1) / 2).0, 512 * KIB);
+        assert_eq!(ReadableSize::mb(2) / ReadableSize::kb(1), 2048);
+    }
+
+    #[test]
+    fn test_parse_readable_size() {
+        #[derive(Serialize, Deserialize)]
+        struct SizeHolder {
+            s: ReadableSize,
+        }
+
+        let legal_cases = vec![
+            (0, "0KiB"),
+            (2 * KIB, "2KiB"),
+            (4 * MIB, "4MiB"),
+            (5 * GIB, "5GiB"),
+            (7 * TIB, "7TiB"),
+            (11 * PIB, "11PiB"),
+        ];
+        for (size, exp) in legal_cases {
+            let c = SizeHolder {
+                s: ReadableSize(size),
+            };
+            let res_str = toml::to_string(&c).unwrap();
+            let exp_str = format!("s = {exp:?}\n");
+            assert_eq!(res_str, exp_str);
+            let res_size: SizeHolder = toml::from_str(&exp_str).unwrap();
+            assert_eq!(res_size.s.0, size);
+        }
+
+        let c = SizeHolder {
+            s: ReadableSize(512),
+        };
+        let res_str = toml::to_string(&c).unwrap();
+        assert_eq!(res_str, "s = 512\n");
+        let res_size: SizeHolder = toml::from_str(&res_str).unwrap();
+        assert_eq!(res_size.s.0, c.s.0);
+
+        let decode_cases = vec![
+            (" 0.5 PB", PIB / 2),
+            ("0.5 TB", TIB / 2),
+            ("0.5GB ", GIB / 2),
+            ("0.5MB", MIB / 2),
+            ("0.5KB", KIB / 2),
+            ("0.5P", PIB / 2),
+            ("0.5T", TIB / 2),
+            ("0.5G", GIB / 2),
+            ("0.5M", MIB / 2),
+            ("0.5K", KIB / 2),
+            ("23", 23),
+            ("1", 1),
+            ("1024B", KIB),
+            // units with binary prefixes
+            (" 0.5 PiB", PIB / 2),
+            ("1PiB", PIB),
+            ("0.5 TiB", TIB / 2),
+            ("2 TiB", TIB * 2),
+            ("0.5GiB ", GIB / 2),
+            ("787GiB ", GIB * 787),
+            ("0.5MiB", MIB / 2),
+            ("3MiB", MIB * 3),
+            ("0.5KiB", KIB / 2),
+            ("1 KiB", KIB),
+            // scientific notation
+            ("0.5e6 B", B * 500000),
+            ("0.5E6 B", B * 500000),
+            ("1e6B", B * 1000000),
+            ("8E6B", B * 8000000),
+            ("8e7", B * 80000000),
+            ("1e-1MB", MIB / 10),
+            ("1e+1MB", MIB * 10),
+            ("0e+10MB", 0),
+        ];
+        for (src, exp) in decode_cases {
+            let src = format!("s = {src:?}");
+            let res: SizeHolder = toml::from_str(&src).unwrap();
+            assert_eq!(res.s.0, exp);
+        }
+
+        let illegal_cases = vec![
+            "0.5kb", "0.5kB", "0.5Kb", "0.5k", "0.5g", "b", "gb", "1b", "B", 
"1K24B", " 5_KB",
+            "4B7", "5M_",
+        ];
+        for src in illegal_cases {
+            let src_str = format!("s = {src:?}");
+            assert!(toml::from_str::<SizeHolder>(&src_str).is_err(), "{}", 
src);
+        }
+    }
+}
diff --git a/src/common/src/time_ext.rs b/src/common/src/time_ext.rs
new file mode 100644
index 00000000..87b61d84
--- /dev/null
+++ b/src/common/src/time_ext.rs
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    fmt::{self, Write},
+    str::FromStr,
+    time::{Duration, SystemTime, UNIX_EPOCH},
+};
+
+use serde::{
+    de::{self, Visitor},
+    Deserialize, Deserializer, Serialize, Serializer,
+};
+
+const TIME_MAGNITUDE_1: u64 = 1000;
+const TIME_MAGNITUDE_2: u64 = 60;
+const TIME_MAGNITUDE_3: u64 = 24;
+const UNIT: u64 = 1;
+const MS: u64 = UNIT;
+const SECOND: u64 = MS * TIME_MAGNITUDE_1;
+const MINUTE: u64 = SECOND * TIME_MAGNITUDE_2;
+const HOUR: u64 = MINUTE * TIME_MAGNITUDE_2;
+const DAY: u64 = HOUR * TIME_MAGNITUDE_3;
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Default)]
+pub struct ReadableDuration(pub Duration);
+
+impl ReadableDuration {
+    pub const fn secs(secs: u64) -> ReadableDuration {
+        ReadableDuration(Duration::from_secs(secs))
+    }
+
+    pub const fn millis(millis: u64) -> ReadableDuration {
+        ReadableDuration(Duration::from_millis(millis))
+    }
+
+    pub const fn minutes(minutes: u64) -> ReadableDuration {
+        ReadableDuration::secs(minutes * 60)
+    }
+
+    pub const fn hours(hours: u64) -> ReadableDuration {
+        ReadableDuration::minutes(hours * 60)
+    }
+
+    pub const fn days(days: u64) -> ReadableDuration {
+        ReadableDuration::hours(days * 24)
+    }
+
+    pub fn as_secs(&self) -> u64 {
+        self.0.as_secs()
+    }
+
+    pub fn as_millis(&self) -> u64 {
+        self.0.as_millis() as u64
+    }
+
+    pub fn is_zero(&self) -> bool {
+        self.0.as_nanos() == 0
+    }
+}
+
+impl From<Duration> for ReadableDuration {
+    fn from(t: Duration) -> ReadableDuration {
+        ReadableDuration(t)
+    }
+}
+
+impl From<ReadableDuration> for Duration {
+    fn from(readable: ReadableDuration) -> Duration {
+        readable.0
+    }
+}
+
+impl FromStr for ReadableDuration {
+    type Err = String;
+
+    fn from_str(dur_str: &str) -> std::result::Result<ReadableDuration, 
String> {
+        let dur_str = dur_str.trim();
+        if !dur_str.is_ascii() {
+            return Err(format!("unexpected ascii string: {dur_str}"));
+        }
+        let err_msg = "valid duration, only d, h, m, s, ms are 
supported.".to_owned();
+        let mut left = dur_str.as_bytes();
+        let mut last_unit = DAY + 1;
+        let mut dur = 0f64;
+        while let Some(idx) = left.iter().position(|c| b"dhms".contains(c)) {
+            let (first, second) = left.split_at(idx);
+            let unit = if second.starts_with(b"ms") {
+                left = &left[idx + 2..];
+                MS
+            } else {
+                let u = match second[0] {
+                    b'd' => DAY,
+                    b'h' => HOUR,
+                    b'm' => MINUTE,
+                    b's' => SECOND,
+                    _ => return Err(err_msg),
+                };
+                left = &left[idx + 1..];
+                u
+            };
+            if unit >= last_unit {
+                return Err("d, h, m, s, ms should occur in given 
order.".to_owned());
+            }
+            // do we need to check 12h360m?
+            let number_str = unsafe { std::str::from_utf8_unchecked(first) };
+            dur += match number_str.trim().parse::<f64>() {
+                Ok(n) => n * unit as f64,
+                Err(_) => return Err(err_msg),
+            };
+            last_unit = unit;
+        }
+        if !left.is_empty() {
+            return Err(err_msg);
+        }
+        if dur.is_sign_negative() {
+            return Err("duration should be positive.".to_owned());
+        }
+        let secs = dur as u64 / SECOND;
+        let millis = (dur as u64 % SECOND) as u32 * 1_000_000;
+        Ok(ReadableDuration(Duration::new(secs, millis)))
+    }
+}
+
+impl fmt::Display for ReadableDuration {
+    #[inline]
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let mut dur = self.0.as_millis() as u64;
+        let mut written = false;
+        if dur >= DAY {
+            written = true;
+            write!(f, "{}d", dur / DAY)?;
+            dur %= DAY;
+        }
+        if dur >= HOUR {
+            written = true;
+            write!(f, "{}h", dur / HOUR)?;
+            dur %= HOUR;
+        }
+        if dur >= MINUTE {
+            written = true;
+            write!(f, "{}m", dur / MINUTE)?;
+            dur %= MINUTE;
+        }
+        if dur >= SECOND {
+            written = true;
+            write!(f, "{}s", dur / SECOND)?;
+            dur %= SECOND;
+        }
+        if dur > 0 {
+            written = true;
+            write!(f, "{dur}ms")?;
+        }
+        if !written {
+            write!(f, "0s")?;
+        }
+        Ok(())
+    }
+}
+
+impl Serialize for ReadableDuration {
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: Serializer,
+    {
+        let mut buffer = String::new();
+        write!(buffer, "{self}").unwrap();
+        serializer.serialize_str(&buffer)
+    }
+}
+
+impl<'de> Deserialize<'de> for ReadableDuration {
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        struct DurVisitor;
+
+        impl Visitor<'_> for DurVisitor {
+            type Value = ReadableDuration;
+
+            fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> 
fmt::Result {
+                formatter.write_str("valid duration")
+            }
+
+            fn visit_str<E>(self, dur_str: &str) -> 
std::result::Result<ReadableDuration, E>
+            where
+                E: de::Error,
+            {
+                dur_str.parse().map_err(E::custom)
+            }
+        }
+
+        deserializer.deserialize_str(DurVisitor)
+    }
+}
+
+/// Current time in milliseconds.
+pub fn now() -> i64 {
+    let now = SystemTime::now();
+    let duration = now.duration_since(UNIX_EPOCH).unwrap();
+    duration.as_millis() as i64
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_duration_construction() {
+        let mut dur = ReadableDuration::secs(1);
+        assert_eq!(dur.0, Duration::new(1, 0));
+        assert_eq!(dur.as_secs(), 1);
+        assert_eq!(dur.as_millis(), 1000);
+        dur = ReadableDuration::millis(1001);
+        assert_eq!(dur.0, Duration::new(1, 1_000_000));
+        assert_eq!(dur.as_secs(), 1);
+        assert_eq!(dur.as_millis(), 1001);
+        dur = ReadableDuration::minutes(2);
+        assert_eq!(dur.0, Duration::new(2 * 60, 0));
+        assert_eq!(dur.as_secs(), 120);
+        assert_eq!(dur.as_millis(), 120000);
+        dur = ReadableDuration::hours(2);
+        assert_eq!(dur.0, Duration::new(2 * 3600, 0));
+        assert_eq!(dur.as_secs(), 7200);
+        assert_eq!(dur.as_millis(), 7200000);
+    }
+
+    #[test]
+    fn test_parse_readable_duration() {
+        #[derive(Serialize, Deserialize)]
+        struct DurHolder {
+            d: ReadableDuration,
+        }
+
+        let legal_cases = vec![
+            (0, 0, "0s"),
+            (0, 1, "1ms"),
+            (2, 0, "2s"),
+            (24 * 3600, 0, "1d"),
+            (2 * 24 * 3600, 10, "2d10ms"),
+            (4 * 60, 0, "4m"),
+            (5 * 3600, 0, "5h"),
+            (3600 + 2 * 60, 0, "1h2m"),
+            (5 * 24 * 3600 + 3600 + 2 * 60, 0, "5d1h2m"),
+            (3600 + 2, 5, "1h2s5ms"),
+            (3 * 24 * 3600 + 7 * 3600 + 2, 5, "3d7h2s5ms"),
+        ];
+        for (secs, ms, exp) in legal_cases {
+            let d = DurHolder {
+                d: ReadableDuration(Duration::new(secs, ms * 1_000_000)),
+            };
+            let res_str = toml::to_string(&d).unwrap();
+            let exp_str = format!("d = {exp:?}\n");
+            assert_eq!(res_str, exp_str);
+            let res_dur: DurHolder = toml::from_str(&exp_str).unwrap();
+            assert_eq!(res_dur.d.0, d.d.0);
+        }
+
+        let decode_cases = vec![(" 0.5 h2m ", 3600 / 2 + 2 * 60, 0)];
+        for (src, secs, ms) in decode_cases {
+            let src = format!("d = {src:?}");
+            let res: DurHolder = toml::from_str(&src).unwrap();
+            assert_eq!(res.d.0, Duration::new(secs, ms * 1_000_000));
+        }
+
+        let illegal_cases = vec!["1H", "1M", "1S", "1MS", "1h1h", "h"];
+        for src in illegal_cases {
+            let src_str = format!("d = {src:?}");
+            assert!(toml::from_str::<DurHolder>(&src_str).is_err(), "{}", src);
+        }
+        assert!(toml::from_str::<DurHolder>("d = 23").is_err());
+    }
+}
diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml
index ca8dc576..ba6d2daa 100644
--- a/src/metric_engine/Cargo.toml
+++ b/src/metric_engine/Cargo.toml
@@ -39,6 +39,7 @@ async-trait = { workspace = true }
 byteorder = { workspace = true }
 bytes = { workspace = true }
 bytesize = { workspace = true }
+common = { workspace = true }
 datafusion = { workspace = true }
 futures = { workspace = true }
 itertools = { workspace = true }
diff --git a/src/metric_engine/src/compaction/picker.rs 
b/src/metric_engine/src/compaction/picker.rs
index 9107e3ba..28e08688 100644
--- a/src/metric_engine/src/compaction/picker.rs
+++ b/src/metric_engine/src/compaction/picker.rs
@@ -17,9 +17,10 @@
 
 use std::{collections::BTreeMap, time::Duration};
 
+use common::now;
 use tracing::debug;
 
-use crate::{compaction::Task, manifest::ManifestRef, sst::SstFile, 
types::Timestamp, util::now};
+use crate::{compaction::Task, manifest::ManifestRef, sst::SstFile, 
types::Timestamp};
 
 pub struct Picker {
     manifest: ManifestRef,
diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs
index 3c2334de..2bbcdd6b 100644
--- a/src/metric_engine/src/lib.rs
+++ b/src/metric_engine/src/lib.rs
@@ -29,6 +29,5 @@ pub mod storage;
 #[cfg(test)]
 mod test_util;
 pub mod types;
-pub(crate) mod util;
 
 pub use error::{AnyhowError, Error, Result};
diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs
index e5b6acf0..72107abd 100644
--- a/src/metric_engine/src/storage.rs
+++ b/src/metric_engine/src/storage.rs
@@ -55,8 +55,8 @@ use crate::{
     read::ParquetReader,
     sst::{allocate_id, FileMeta, SstPathGenerator},
     types::{
-        ObjectStoreRef, RuntimeOptions, StorageOptions, StorageSchema, 
TimeRange, WriteOptions,
-        WriteResult, SEQ_COLUMN_NAME,
+        ObjectStoreRef, StorageOptions, StorageSchema, TimeRange, 
WriteOptions, WriteResult,
+        SEQ_COLUMN_NAME,
     },
     Result,
 };
@@ -95,31 +95,17 @@ pub trait TimeMergeStorage {
 pub type TimeMergeStorageRef = Arc<(dyn TimeMergeStorage + Send + Sync)>;
 
 #[derive(Clone)]
-struct StorageRuntimes {
+pub struct StorageRuntimes {
     manifest_compact_runtime: Arc<Runtime>,
     sst_compact_runtime: Arc<Runtime>,
 }
 
 impl StorageRuntimes {
-    pub fn new(runtime_opts: RuntimeOptions) -> Result<Self> {
-        let manifest_compact_runtime = 
tokio::runtime::Builder::new_multi_thread()
-            .thread_name("man-compact")
-            .worker_threads(runtime_opts.manifest_compact_thread_num)
-            .enable_all()
-            .build()
-            .context("build storgae compact runtime")?;
-
-        let sst_compact_runtime = tokio::runtime::Builder::new_multi_thread()
-            .thread_name("sst-compact")
-            .worker_threads(runtime_opts.sst_compact_thread_num)
-            .enable_all()
-            .build()
-            .context("build storgae compact runtime")?;
-
-        Ok(Self {
-            manifest_compact_runtime: Arc::new(manifest_compact_runtime),
-            sst_compact_runtime: Arc::new(sst_compact_runtime),
-        })
+    pub fn new(manifest_compact_runtime: Arc<Runtime>, sst_compact_runtime: 
Arc<Runtime>) -> Self {
+        Self {
+            manifest_compact_runtime,
+            sst_compact_runtime,
+        }
     }
 }
 
@@ -154,25 +140,15 @@ pub struct CloudObjectStorage {
 /// ```
 /// `root_path` is composed of `path` and `segment_duration`.
 impl CloudObjectStorage {
-    pub fn try_new(
+    pub async fn try_new(
         path: String,
         segment_duration: Duration,
         store: ObjectStoreRef,
         arrow_schema: SchemaRef,
         num_primary_keys: usize,
         storage_opts: StorageOptions,
+        runtimes: StorageRuntimes,
     ) -> Result<Self> {
-        let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
-        let manifest = runtimes.manifest_compact_runtime.block_on(async {
-            Manifest::try_new(
-                path.clone(),
-                store.clone(),
-                runtimes.manifest_compact_runtime.clone(),
-                storage_opts.manifest_merge_opts,
-            )
-            .await
-        })?;
-        let manifest = Arc::new(manifest);
         let schema = {
             let value_idxes = 
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
             ensure!(!value_idxes.is_empty(), "no value column found");
@@ -197,6 +173,14 @@ impl CloudObjectStorage {
                 update_mode,
             }
         };
+        let manifest = Manifest::try_new(
+            path.clone(),
+            store.clone(),
+            runtimes.manifest_compact_runtime.clone(),
+            storage_opts.manifest_merge_opts,
+        )
+        .await?;
+        let manifest = Arc::new(manifest);
         let write_props = Self::build_write_props(storage_opts.write_opts, 
num_primary_keys);
         let sst_path_gen = Arc::new(SstPathGenerator::new(path.clone()));
         let parquet_reader = Arc::new(ParquetReader::new(
@@ -431,22 +415,30 @@ mod tests {
     use super::*;
     use crate::{arrow_schema, record_batch, test_util::check_stream, 
types::Timestamp};
 
+    fn build_runtimes() -> StorageRuntimes {
+        let rt = Arc::new(Runtime::new().unwrap());
+        StorageRuntimes::new(rt.clone(), rt)
+    }
+
     #[test(test)]
     fn test_storage_write_and_scan() {
         let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", 
Int64));
         let root_dir = temp_dir::TempDir::new().unwrap();
         let store = Arc::new(LocalFileSystem::new());
-        let storage = CloudObjectStorage::try_new(
-            root_dir.path().to_string_lossy().to_string(),
-            Duration::from_hours(2),
-            store,
-            schema.clone(),
-            2, // num_primary_keys
-            StorageOptions::default(),
-        )
-        .unwrap();
+        let runtimes = build_runtimes();
+        runtimes.sst_compact_runtime.clone().block_on(async move {
+            let storage = CloudObjectStorage::try_new(
+                root_dir.path().to_string_lossy().to_string(),
+                Duration::from_hours(2),
+                store,
+                schema.clone(),
+                2, // num_primary_keys
+                StorageOptions::default(),
+                runtimes,
+            )
+            .await
+            .unwrap();
 
-        storage.runtimes.sst_compact_runtime.block_on(async {
             let batch = record_batch!(
                 ("pk1", UInt8, vec![11, 11, 9, 10, 5]),
                 ("pk2", UInt8, vec![100, 100, 1, 2, 3]),
@@ -509,16 +501,19 @@ mod tests {
         let schema = arrow_schema!(("a", UInt8), ("b", UInt8), ("c", UInt8), 
("c", UInt8));
         let root_dir = temp_dir::TempDir::new().unwrap();
         let store = Arc::new(LocalFileSystem::new());
-        let storage = CloudObjectStorage::try_new(
-            root_dir.path().to_string_lossy().to_string(),
-            Duration::from_hours(2),
-            store,
-            schema.clone(),
-            1,
-            StorageOptions::default(),
-        )
-        .unwrap();
-        storage.runtimes.sst_compact_runtime.block_on(async {
+        let runtimes = build_runtimes();
+        runtimes.sst_compact_runtime.clone().block_on(async move {
+            let storage = CloudObjectStorage::try_new(
+                root_dir.path().to_string_lossy().to_string(),
+                Duration::from_hours(2),
+                store,
+                schema.clone(),
+                1,
+                StorageOptions::default(),
+                runtimes,
+            )
+            .await
+            .unwrap();
             let batch = record_batch!(
                 ("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]),
                 ("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]),
diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs
index 03d75ed4..107a363a 100644
--- a/src/metric_engine/src/types.rs
+++ b/src/metric_engine/src/types.rs
@@ -164,21 +164,6 @@ impl Default for WriteOptions {
     }
 }
 
-#[derive(Debug)]
-pub struct RuntimeOptions {
-    pub manifest_compact_thread_num: usize,
-    pub sst_compact_thread_num: usize,
-}
-
-impl Default for RuntimeOptions {
-    fn default() -> Self {
-        Self {
-            manifest_compact_thread_num: 2,
-            sst_compact_thread_num: 4,
-        }
-    }
-}
-
 #[derive(Debug)]
 pub struct ManifestMergeOptions {
     pub channel_size: usize,
@@ -211,7 +196,6 @@ pub enum UpdateMode {
 pub struct StorageOptions {
     pub write_opts: WriteOptions,
     pub manifest_merge_opts: ManifestMergeOptions,
-    pub runtime_opts: RuntimeOptions,
     pub update_mode: UpdateMode,
 }
 
diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml
index 03ad7929..19acaa23 100644
--- a/src/server/Cargo.toml
+++ b/src/server/Cargo.toml
@@ -33,9 +33,14 @@ workspace = true
 [dependencies]
 actix-web = "4"
 arrow = { workspace = true }
+clap = { workspace = true, features = ["derive"] }
+common = { workspace = true }
 futures = { workspace = true }
 metric_engine = { workspace = true }
 object_store = { workspace = true }
+rand = "0.8"
+serde = { workspace = true }
 tokio = { workspace = true }
+toml = { workspace = true }
 tracing = { workspace = true }
 tracing-subscriber = { workspace = true }
diff --git a/src/server/src/config.rs b/src/server/src/config.rs
new file mode 100644
index 00000000..41f39d25
--- /dev/null
+++ b/src/server/src/config.rs
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use common::ReadableDuration;
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(default)]
+pub struct Config {
+    pub port: u16,
+    pub write_worker_num: usize, // for test
+    pub metric_engine: MetricEngineConfig,
+}
+
+impl Default for Config {
+    fn default() -> Self {
+        Self {
+            port: 5000,
+            write_worker_num: 4,
+            metric_engine: MetricEngineConfig::default(),
+        }
+    }
+}
+
+#[derive(Default, Debug, Clone, Deserialize, Serialize)]
+#[serde(default)]
+pub struct MetricEngineConfig {
+    pub manifest: ManifestConfig,
+    pub sst: SstConfig,
+    pub storage: StorageConfig,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(default)]
+pub struct ManifestConfig {
+    pub background_thread_num: usize,
+}
+
+impl Default for ManifestConfig {
+    fn default() -> Self {
+        Self {
+            background_thread_num: 2,
+        }
+    }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(default)]
+pub struct SstConfig {
+    pub background_thread_num: usize,
+}
+
+impl Default for SstConfig {
+    fn default() -> Self {
+        Self {
+            background_thread_num: 2,
+        }
+    }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(tag = "type")]
+#[allow(clippy::large_enum_variant)]
+pub enum StorageConfig {
+    Local(LocalStorageConfig),
+    S3Like(S3LikeStorageConfig),
+}
+
+impl Default for StorageConfig {
+    fn default() -> Self {
+        Self::Local(LocalStorageConfig::default())
+    }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct LocalStorageConfig {
+    pub data_dir: String,
+}
+
+impl Default for LocalStorageConfig {
+    fn default() -> Self {
+        Self {
+            data_dir: "/tmp/horaedb".to_string(),
+        }
+    }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct S3LikeStorageConfig {
+    pub region: String,
+    pub key_id: String,
+    pub key_secret: String,
+    pub endpoint: String,
+    pub bucket: String,
+    pub prefix: String,
+    #[serde(default = "default_max_retries")]
+    pub max_retries: usize,
+    #[serde(default)]
+    pub http: HttpOptions,
+    #[serde(default)]
+    pub timeout: TimeoutOptions,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct HttpOptions {
+    pub pool_max_idle_per_host: usize,
+    pub timeout: ReadableDuration,
+    pub keep_alive_timeout: ReadableDuration,
+    pub keep_alive_interval: ReadableDuration,
+}
+
+impl Default for HttpOptions {
+    fn default() -> Self {
+        Self {
+            pool_max_idle_per_host: 1024,
+            timeout: ReadableDuration::secs(15),
+            keep_alive_timeout: ReadableDuration::secs(10),
+            keep_alive_interval: ReadableDuration::secs(2),
+        }
+    }
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+pub struct TimeoutOptions {
+    // Non IO Operation like stat and delete, they operate on a single file, 
we control them by
+    // setting timeout.
+    pub timeout: ReadableDuration,
+    // IO Operation like read and write, they operate on data directly, we 
control them by setting
+    // io_timeout.
+    pub io_timeout: ReadableDuration,
+}
+
+impl Default for TimeoutOptions {
+    fn default() -> Self {
+        Self {
+            timeout: ReadableDuration::secs(10),
+            io_timeout: ReadableDuration::secs(10),
+        }
+    }
+}
+
+#[inline]
+fn default_max_retries() -> usize {
+    3
+}
diff --git a/src/server/src/main.rs b/src/server/src/main.rs
index c02bc939..0644f5a5 100644
--- a/src/server/src/main.rs
+++ b/src/server/src/main.rs
@@ -16,20 +16,36 @@
 // under the License.
 
 #![feature(duration_constructors)]
-use std::{sync::Arc, time::Duration};
+mod config;
+use std::{fs, iter::repeat_with, sync::Arc, time::Duration};
 
 use actix_web::{
     get,
     web::{self, Data},
     App, HttpResponse, HttpServer, Responder,
 };
-use arrow::datatypes::{DataType, Field, Schema};
+use arrow::{
+    array::{Int64Array, RecordBatch},
+    datatypes::{DataType, Field, Schema, SchemaRef},
+};
+use clap::Parser;
+use config::{Config, StorageConfig};
 use metric_engine::{
-    storage::{CloudObjectStorage, CompactRequest, TimeMergeStorageRef},
-    types::StorageOptions,
+    storage::{
+        CloudObjectStorage, CompactRequest, StorageRuntimes, 
TimeMergeStorageRef, WriteRequest,
+    },
+    types::{RuntimeRef, StorageOptions},
 };
 use object_store::local::LocalFileSystem;
-use tracing::info;
+use tracing::{error, info};
+
+#[derive(Parser, Debug)]
+#[command(version, about, long_about)]
+struct Args {
+    /// Config file path
+    #[arg(short, long)]
+    config: String,
+}
 
 #[get("/")]
 async fn hello() -> impl Responder {
@@ -48,38 +64,119 @@ struct AppState {
     storage: TimeMergeStorageRef,
 }
 
-#[actix_web::main]
-async fn main() -> std::io::Result<()> {
+pub fn main() {
     // install global collector configured based on RUST_LOG env var.
     tracing_subscriber::fmt::init();
 
-    let port = 5000;
+    let args = Args::parse();
+    let config_body = fs::read_to_string(args.config).expect("read config file 
failed");
+    let config: Config = toml::from_str(&config_body).unwrap();
+    info!(config = ?config, "Config loaded");
+
+    let port = config.port;
+    let rt = build_multi_runtime("main", 1);
+    let manifest_compact_runtime = build_multi_runtime(
+        "manifest-compact",
+        config.metric_engine.manifest.background_thread_num,
+    );
+    let sst_compact_runtime = build_multi_runtime(
+        "sst-compact",
+        config.metric_engine.sst.background_thread_num,
+    );
+    let runtimes = StorageRuntimes::new(manifest_compact_runtime, 
sst_compact_runtime);
+    let storage_config = match config.metric_engine.storage {
+        StorageConfig::Local(v) => v,
+        StorageConfig::S3Like(_) => panic!("S3 not support yet"),
+    };
+    let write_worker_num = config.write_worker_num;
+    let write_rt = build_multi_runtime("write", write_worker_num);
+    let _ = rt.block_on(async move {
+        let store = Arc::new(LocalFileSystem::new());
+        let storage = Arc::new(
+            CloudObjectStorage::try_new(
+                storage_config.data_dir,
+                Duration::from_mins(10),
+                store,
+                build_schema(),
+                3,
+                StorageOptions::default(),
+                runtimes,
+            )
+            .await
+            .unwrap(),
+        );
+
+        bench_write(write_rt.clone(), write_worker_num, storage.clone());
+
+        let app_state = Data::new(AppState { storage });
+        info!(port, "Start HoraeDB http server...");
+        HttpServer::new(move || {
+            App::new()
+                .app_data(app_state.clone())
+                .service(hello)
+                .service(compact)
+        })
+        .workers(4)
+        .bind(("127.0.0.1", port))
+        .expect("Server bind failed")
+        .run()
+        .await
+    });
+}
+
+fn build_multi_runtime(name: &str, workers: usize) -> RuntimeRef {
+    let rt = tokio::runtime::Builder::new_multi_thread()
+        .thread_name(name)
+        .worker_threads(workers)
+        .enable_all()
+        .build()
+        .expect("build tokio runtime");
+
+    Arc::new(rt)
+}
+
+fn build_schema() -> SchemaRef {
+    Arc::new(Schema::new(vec![
+        Field::new("pk1", DataType::Int64, true),
+        Field::new("pk2", DataType::Int64, true),
+        Field::new("pk3", DataType::Int64, true),
+        Field::new("value", DataType::Int64, true),
+    ]))
+}
+
+fn bench_write(rt: RuntimeRef, workers: usize, storage: TimeMergeStorageRef) {
     let schema = Arc::new(Schema::new(vec![
         Field::new("pk1", DataType::Int64, true),
         Field::new("pk2", DataType::Int64, true),
+        Field::new("pk3", DataType::Int64, true),
         Field::new("value", DataType::Int64, true),
     ]));
-    let store = Arc::new(LocalFileSystem::new());
-    let storage = Arc::new(
-        CloudObjectStorage::try_new(
-            "/tmp/test".to_string(),
-            Duration::from_mins(10),
-            store,
-            schema,
-            2,
-            StorageOptions::default(),
-        )
-        .unwrap(),
-    );
-    let app_state = Data::new(AppState { storage });
-    info!(port, "Start HoraeDB http server...");
-    HttpServer::new(move || {
-        App::new()
-            .app_data(app_state.clone())
-            .service(hello)
-            .service(compact)
-    })
-    .bind(("127.0.0.1", port))?
-    .run()
-    .await
+    for _ in 0..workers {
+        let storage = storage.clone();
+        let schema = schema.clone();
+        rt.spawn(async move {
+            loop {
+                let pk1: Int64Array = 
repeat_with(rand::random::<i64>).take(1000).collect();
+                let pk2: Int64Array = 
repeat_with(rand::random::<i64>).take(1000).collect();
+                let pk3: Int64Array = 
repeat_with(rand::random::<i64>).take(1000).collect();
+                let value: Int64Array = 
repeat_with(rand::random::<i64>).take(1000).collect();
+                let batch = RecordBatch::try_new(
+                    schema.clone(),
+                    vec![Arc::new(pk1), Arc::new(pk2), Arc::new(pk3), 
Arc::new(value)],
+                )
+                .unwrap();
+                let now = common::now();
+                if let Err(e) = storage
+                    .write(WriteRequest {
+                        batch,
+                        enable_check: false,
+                        time_range: (now..now + 1).into(),
+                    })
+                    .await
+                {
+                    error!("write failed, err:{}", e);
+                }
+            }
+        });
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to