This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 0cee69fa feat: add wal replay benchmark (#1511)
0cee69fa is described below
commit 0cee69fa4dad05cd1d188497daf7a5d6072a1d76
Author: MianChen <[email protected]>
AuthorDate: Wed Apr 17 18:51:09 2024 +0800
feat: add wal replay benchmark (#1511)
---
Cargo.lock | 1 +
src/benchmarks/Cargo.toml | 2 +
src/benchmarks/bench.toml | 4 +
src/benchmarks/benches/bench.rs | 22 +-
src/benchmarks/config/bench.toml | 5 +
src/benchmarks/src/config.rs | 8 +
src/benchmarks/src/lib.rs | 2 +
src/benchmarks/src/replay_bench.rs | 97 +++++++++
src/benchmarks/src/table.rs | 246 +++++++++++++++++++++
src/benchmarks/src/util.rs | 423 ++++++++++++++++++++++++++++++++++++-
10 files changed, 804 insertions(+), 6 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 24230bde..c8e331b0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -939,6 +939,7 @@ dependencies = [
"snafu 0.6.10",
"table_engine",
"table_kv",
+ "tempfile",
"time_ext",
"tokio",
"toml_ext",
diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml
index 843e2fc7..031d0723 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/benchmarks/Cargo.toml
@@ -54,6 +54,7 @@ size_ext = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
table_kv = { workspace = true }
+tempfile = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
toml_ext = { workspace = true }
@@ -63,6 +64,7 @@ zstd = { workspace = true }
[dev-dependencies]
criterion = { workspace = true }
+tempfile = { workspace = true }
[[bench]]
name = "bench"
diff --git a/src/benchmarks/bench.toml b/src/benchmarks/bench.toml
index 9e9e0331..b76f779f 100644
--- a/src/benchmarks/bench.toml
+++ b/src/benchmarks/bench.toml
@@ -64,3 +64,7 @@ bench_measurement_time = "60s"
bench_sample_size = 60
batch_size = 512
value_size = 1024
+
+[replay_bench]
+bench_measurement_time = "3s"
+bench_sample_size = 10
\ No newline at end of file
diff --git a/src/benchmarks/benches/bench.rs b/src/benchmarks/benches/bench.rs
index 7f5f5347..cb5d76ed 100644
--- a/src/benchmarks/benches/bench.rs
+++ b/src/benchmarks/benches/bench.rs
@@ -17,13 +17,14 @@
//! Benchmarks
-use std::sync::Once;
+use std::{cell::RefCell, sync::Once};
use benchmarks::{
config::{self, BenchConfig},
merge_memtable_bench::MergeMemTableBench,
merge_sst_bench::MergeSstBench,
parquet_bench::ParquetBench,
+ replay_bench::ReplayBench,
scan_memtable_bench::ScanMemTableBench,
sst_bench::SstBench,
wal_write_bench::WalWriteBench,
@@ -208,6 +209,24 @@ fn bench_wal_write(c: &mut Criterion) {
group.finish();
}
+fn bench_replay_iter(b: &mut Bencher<'_>, bench: &RefCell<ReplayBench>) {
+ let mut bench = bench.borrow_mut();
+ b.iter(|| bench.run_bench())
+}
+
+fn bench_replay(c: &mut Criterion) {
+ let config = init_bench();
+
+ let mut group = c.benchmark_group("replay");
+
+ group.measurement_time(config.replay_bench.bench_measurement_time.0);
+ group.sample_size(config.replay_bench.bench_sample_size);
+
+ let bench = RefCell::new(ReplayBench::new(config.replay_bench));
+ group.bench_with_input(BenchmarkId::new("replay", 0), &bench,
bench_replay_iter);
+ group.finish();
+}
+
criterion_group!(
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100,
Output::Flamegraph(None)));
@@ -217,6 +236,7 @@ criterion_group!(
bench_scan_memtable,
bench_merge_memtable,
bench_wal_write,
+ bench_replay,
);
criterion_main!(benches);
diff --git a/src/benchmarks/config/bench.toml b/src/benchmarks/config/bench.toml
index 41a08af7..ad66fdea 100644
--- a/src/benchmarks/config/bench.toml
+++ b/src/benchmarks/config/bench.toml
@@ -71,3 +71,8 @@ bench_measurement_time = "60s"
bench_sample_size = 60
batch_size = 512
value_size = 1024
+
+[replay_bench]
+bench_measurement_time = "3s"
+bench_sample_size = 10
+batch_size = 10000
diff --git a/src/benchmarks/src/config.rs b/src/benchmarks/src/config.rs
index b90a7cb5..493eb7c6 100644
--- a/src/benchmarks/src/config.rs
+++ b/src/benchmarks/src/config.rs
@@ -38,6 +38,7 @@ pub struct BenchConfig {
pub scan_memtable_bench: ScanMemTableBenchConfig,
pub merge_memtable_bench: MergeMemTableBenchConfig,
pub wal_write_bench: WalWriteBenchConfig,
+ pub replay_bench: ReplayConfig,
}
// TODO(yingwen): Maybe we can use layze static to load config first.
@@ -147,3 +148,10 @@ pub struct WalWriteBenchConfig {
pub batch_size: usize,
pub value_size: usize,
}
+
+#[derive(Deserialize)]
+pub struct ReplayConfig {
+ pub bench_measurement_time: ReadableDuration,
+ pub bench_sample_size: usize,
+ pub batch_size: usize,
+}
diff --git a/src/benchmarks/src/lib.rs b/src/benchmarks/src/lib.rs
index 21142dd2..ffc098c5 100644
--- a/src/benchmarks/src/lib.rs
+++ b/src/benchmarks/src/lib.rs
@@ -23,9 +23,11 @@ pub mod config;
pub mod merge_memtable_bench;
pub mod merge_sst_bench;
pub mod parquet_bench;
+pub mod replay_bench;
pub mod scan_memtable_bench;
pub mod sst_bench;
pub mod sst_tools;
+pub mod table;
pub mod util;
pub mod wal_write_bench;
diff --git a/src/benchmarks/src/replay_bench.rs
b/src/benchmarks/src/replay_bench.rs
new file mode 100644
index 00000000..bf2c9a88
--- /dev/null
+++ b/src/benchmarks/src/replay_bench.rs
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Replay bench.
+
+use std::sync::Arc;
+
+use analytic_engine::RecoverMode;
+use runtime::Runtime;
+use util::{OpenTablesMethod, RocksDBEngineBuildContext, TestContext, TestEnv};
+use wal::rocksdb_impl::manager::RocksDBWalsOpener;
+
+use crate::{config::ReplayConfig, table::FixedSchemaTable, util};
+
+pub struct ReplayBench {
+ runtime: Arc<Runtime>,
+ test_ctx: TestContext<RocksDBWalsOpener>,
+ table: FixedSchemaTable,
+ batch_size: usize,
+}
+
+impl ReplayBench {
+ pub fn new(config: ReplayConfig) -> Self {
+ let runtime = util::new_runtime(1);
+ let engine_context = RocksDBEngineBuildContext::new(
+ RecoverMode::TableBased,
+ OpenTablesMethod::WithOpenShard,
+ );
+ let env: TestEnv = TestEnv::builder().build();
+
+ let (test_ctx, fixed_schema_table) = env.block_on(async {
+ let mut test_ctx = env.new_context(&engine_context);
+ test_ctx.open().await;
+
+ let fixed_schema_table = test_ctx
+ .create_fixed_schema_table("test_replay_table1")
+ .await;
+ let _ = test_ctx
+ .create_fixed_schema_table("test_replay_table2")
+ .await;
+ let _ = test_ctx
+ .create_fixed_schema_table("test_replay_table3")
+ .await;
+
+ (test_ctx, fixed_schema_table)
+ });
+
+ ReplayBench {
+ runtime: Arc::new(runtime),
+ test_ctx,
+ table: fixed_schema_table,
+ batch_size: config.batch_size,
+ }
+ }
+
+ pub fn run_bench(&mut self) {
+ self.runtime.block_on(async {
+ self.table.prepare_write_requests(self.batch_size);
+ let rows = self.table.row_tuples();
+
+ // Write data to table.
+ let mut table_names = Vec::new();
+ for (table_name, _) in self.test_ctx.name_to_tables().iter() {
+ let row_group = self.table.rows_to_row_group(&rows);
+ self.test_ctx
+ .write_to_table(table_name.as_str(), row_group)
+ .await;
+ table_names.push(table_name.clone());
+ }
+
+ // Reopen db.
+ self.test_ctx
+ .reopen_with_tables(
+ table_names
+ .iter()
+ .map(|s| s.as_str())
+ .collect::<Vec<_>>()
+ .as_slice(),
+ )
+ .await;
+ });
+ }
+}
diff --git a/src/benchmarks/src/table.rs b/src/benchmarks/src/table.rs
new file mode 100644
index 00000000..31df4234
--- /dev/null
+++ b/src/benchmarks/src/table.rs
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utils to create table.
+
+use std::collections::HashMap;
+
+use common_types::{
+ column_schema,
+ datum::{Datum, DatumKind},
+ row::{Row, RowGroup},
+ schema::{self, Schema},
+ table::DEFAULT_SHARD_ID,
+ time::Timestamp,
+};
+use table_engine::{
+ self,
+ engine::{CreateTableParams, CreateTableRequest, TableState},
+ table::{SchemaId, TableId, TableSeq},
+};
+use time_ext::ReadableDuration;
+
+use crate::util::start_ms;
+
+pub fn new_row_6<C0, C1, C2, C3, C4, C5>(data: (C0, C1, C2, C3, C4, C5)) -> Row
+where
+ C0: Into<Datum>,
+ C1: Into<Datum>,
+ C2: Into<Datum>,
+ C3: Into<Datum>,
+ C4: Into<Datum>,
+ C5: Into<Datum>,
+{
+ let cols = vec![
+ data.0.into(),
+ data.1.into(),
+ data.2.into(),
+ data.3.into(),
+ data.4.into(),
+ data.5.into(),
+ ];
+
+ Row::from_datums(cols)
+}
+
+pub type WriteRequestTuple = (String, Timestamp, String, f64, f64, String);
+pub type RowTuple<'a> = (&'a str, Timestamp, &'a str, f64, f64, &'a str);
+
+pub fn new_table_id(schema_id: u16, table_seq: u32) -> TableId {
+ TableId::with_seq(SchemaId::from(schema_id),
TableSeq::from(table_seq)).unwrap()
+}
+
+pub struct RowTupleGenerator {}
+
+pub struct FixedSchemaTable {
+ create_request: CreateTableRequest,
+ write_requests: Vec<WriteRequestTuple>,
+}
+
+impl FixedSchemaTable {
+ pub fn builder() -> Builder {
+ Builder::default()
+ }
+
+ fn default_schema() -> Schema {
+ Self::default_schema_builder().build().unwrap()
+ }
+
+ pub fn default_schema_builder() -> schema::Builder {
+ create_schema_builder(
+ // Key columns
+ &[("key", DatumKind::String), ("ts", DatumKind::Timestamp)],
+ // Normal columns
+ &[
+ ("string_tag", DatumKind::String),
+ ("double_field1", DatumKind::Double),
+ ("double_field2", DatumKind::Double),
+ ("string_field2", DatumKind::String),
+ ],
+ )
+ }
+
+ #[inline]
+ pub fn table_id(&self) -> TableId {
+ self.create_request.table_id
+ }
+
+ #[inline]
+ pub fn create_request(&self) -> &CreateTableRequest {
+ &self.create_request
+ }
+
+ fn new_row(data: RowTuple) -> Row {
+ new_row_6(data)
+ }
+
+ pub fn rows_to_row_group(&self, data: &[RowTuple]) -> RowGroup {
+ let rows = data
+ .iter()
+ .copied()
+ .map(FixedSchemaTable::new_row)
+ .collect();
+
+ self.new_row_group(rows)
+ }
+
+ fn new_row_group(&self, rows: Vec<Row>) -> RowGroup {
+ RowGroup::try_new(self.create_request.params.table_schema.clone(),
rows).unwrap()
+ }
+
+ pub fn prepare_write_requests(&mut self, batch_size: usize) {
+ let start_ms = start_ms();
+ self.write_requests.clear();
+ (0..batch_size).for_each(|idx| {
+ self.write_requests.push((
+ format!("key_{idx}"),
+ Timestamp::new(start_ms + idx as i64),
+ format!("tag1_{idx}"),
+ 11.0,
+ 110.0,
+ format!("tag2_{idx}"),
+ ))
+ });
+ }
+
+ pub fn row_tuples(&self) -> Vec<RowTuple> {
+ self.write_requests
+ .iter()
+ .map(|x| (x.0.as_str(), x.1, x.2.as_str(), x.3, x.4, x.5.as_str()))
+ .collect()
+ }
+}
+
+#[must_use]
+pub struct Builder {
+ create_request: CreateTableRequest,
+}
+
+impl Builder {
+ pub fn schema_id(mut self, schema_id: SchemaId) -> Self {
+ self.create_request.schema_id = schema_id;
+ self
+ }
+
+ pub fn table_name(mut self, table_name: String) -> Self {
+ self.create_request.params.table_name = table_name;
+ self
+ }
+
+ pub fn table_id(mut self, table_id: TableId) -> Self {
+ self.create_request.table_id = table_id;
+ self
+ }
+
+ pub fn enable_ttl(mut self, enable_ttl: bool) -> Self {
+ self.create_request.params.table_options.insert(
+ common_types::OPTION_KEY_ENABLE_TTL.to_string(),
+ enable_ttl.to_string(),
+ );
+ self
+ }
+
+ pub fn ttl(mut self, duration: ReadableDuration) -> Self {
+ self.create_request
+ .params
+ .table_options
+ .insert(common_types::TTL.to_string(), duration.to_string());
+ self
+ }
+
+ pub fn build_fixed(self) -> FixedSchemaTable {
+ FixedSchemaTable {
+ create_request: self.create_request,
+ write_requests: Vec::new(),
+ }
+ }
+}
+
+impl Default for Builder {
+ fn default() -> Self {
+ let params = CreateTableParams {
+ catalog_name: "horaedb".to_string(),
+ schema_name: "public".to_string(),
+ table_name: "test_table".to_string(),
+ table_schema: FixedSchemaTable::default_schema(),
+ partition_info: None,
+ engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(),
+ table_options: HashMap::new(),
+ };
+
+ Self {
+ create_request: CreateTableRequest {
+ params,
+ schema_id: SchemaId::from_u32(2),
+ table_id: new_table_id(2, 1),
+ state: TableState::Stable,
+ shard_id: DEFAULT_SHARD_ID,
+ },
+ }
+ }
+}
+
+// Format of input slice: &[ ( column name, column type ) ]
+pub fn create_schema_builder(
+ key_tuples: &[(&str, DatumKind)],
+ normal_tuples: &[(&str, DatumKind)],
+) -> schema::Builder {
+ assert!(!key_tuples.is_empty());
+
+ let mut schema_builder = schema::Builder::with_capacity(key_tuples.len() +
normal_tuples.len())
+ .auto_increment_column_id(true)
+ .primary_key_indexes((0..key_tuples.len()).collect());
+
+ for tuple in key_tuples {
+ // Key column is not nullable.
+ let column_schema = column_schema::Builder::new(tuple.0.to_string(),
tuple.1)
+ .is_nullable(false)
+ .build()
+ .expect("Should succeed to build key column schema");
+ schema_builder = schema_builder.add_key_column(column_schema).unwrap();
+ }
+
+ for tuple in normal_tuples {
+ let column_schema = column_schema::Builder::new(tuple.0.to_string(),
tuple.1)
+ .is_nullable(true)
+ .build()
+ .expect("Should succeed to build normal column schema");
+ schema_builder =
schema_builder.add_normal_column(column_schema).unwrap();
+ }
+
+ schema_builder
+}
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index 188081d8..cb6d8de9 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -17,10 +17,11 @@
//! Utilities.
-use std::sync::Arc;
+use std::{collections::HashMap, future::Future, sync::Arc};
use analytic_engine::{
memtable::{key::KeySequence, MemTableRef, PutContext},
+ setup::{EngineBuilder, TableEngineContext},
space::SpaceId,
sst::{
factory::{
@@ -34,19 +35,42 @@ use analytic_engine::{
},
table::sst_util,
table_options::StorageFormat,
+ Config, RecoverMode,
};
use bytes_ext::{BufMut, SafeBufMut};
use common_types::{
projected_schema::{ProjectedSchema, RowProjectorBuilder},
+ record_batch::RecordBatch,
+ row::RowGroup,
schema::{IndexInWriterSchema, Schema},
+ table::{ShardId, DEFAULT_SHARD_ID},
+ time::Timestamp,
};
+use futures::stream::StreamExt;
use macros::define_result;
-use object_store::{ObjectStoreRef, Path};
+use object_store::{
+ config::{LocalOptions, ObjectStoreOptions, StorageOptions},
+ ObjectStoreRef, Path,
+};
use parquet::file::footer;
-use runtime::Runtime;
+use runtime::{PriorityRuntime, Runtime};
+use size_ext::ReadableSize;
use snafu::{ResultExt, Snafu};
-use table_engine::{predicate::Predicate, table::TableId};
-use wal::log_batch::Payload;
+use table_engine::{
+ engine::{CreateTableRequest, EngineRuntimes, OpenShardRequest, TableDef,
TableEngineRef},
+ predicate::Predicate,
+ table::{ReadRequest, SchemaId, TableId, TableRef, WriteRequest},
+};
+use tempfile::TempDir;
+use time_ext::ReadableDuration;
+use wal::{
+ config::{Config as WalConfig, StorageConfig},
+ log_batch::Payload,
+ manager::{OpenedWals, WalRuntimes, WalsOpener},
+ rocksdb_impl::{config::RocksDBStorageConfig, manager::RocksDBWalsOpener},
+};
+
+use crate::{table, table::FixedSchemaTable};
#[derive(Debug, Snafu)]
pub enum Error {
@@ -248,3 +272,392 @@ impl<'a> From<&'a Vec<u8>> for WritePayload<'a> {
Self(data)
}
}
+
+const DAY_MS: i64 = 24 * 60 * 60 * 1000;
+/// 3 days ago.
+pub fn start_ms() -> i64 {
+ Timestamp::now().as_i64() - 3 * DAY_MS
+}
+#[derive(Clone, Copy, Debug)]
+pub enum OpenTablesMethod {
+ WithOpenTable,
+ WithOpenShard,
+}
+
+pub struct TestEnv {
+ _dir: TempDir,
+ pub config: Config,
+ pub runtimes: Arc<EngineRuntimes>,
+}
+
+pub struct Builder {
+ num_workers: usize,
+}
+
+impl Builder {
+ pub fn build(self) -> TestEnv {
+ let dir = tempfile::tempdir().unwrap();
+
+ let config = Config {
+ storage: StorageOptions {
+ mem_cache_capacity: ReadableSize::mb(0),
+ mem_cache_partition_bits: 0,
+ disk_cache_dir: "".to_string(),
+ disk_cache_capacity: ReadableSize::mb(0),
+ disk_cache_page_size: ReadableSize::mb(0),
+ disk_cache_partition_bits: 0,
+ object_store: ObjectStoreOptions::Local(LocalOptions {
+ data_dir: dir.path().to_str().unwrap().to_string(),
+ }),
+ },
+ wal: WalConfig {
+ storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
+ data_dir: dir.path().to_str().unwrap().to_string(),
+ ..Default::default()
+ })),
+ disable_data: false,
+ },
+ ..Default::default()
+ };
+
+ let runtime = Arc::new(
+ runtime::Builder::default()
+ .worker_threads(self.num_workers)
+ .enable_all()
+ .build()
+ .unwrap(),
+ );
+
+ TestEnv {
+ _dir: dir,
+ config,
+ runtimes: Arc::new(EngineRuntimes {
+ read_runtime: PriorityRuntime::new(runtime.clone(),
runtime.clone()),
+ write_runtime: runtime.clone(),
+ meta_runtime: runtime.clone(),
+ compact_runtime: runtime.clone(),
+ default_runtime: runtime.clone(),
+ io_runtime: runtime,
+ }),
+ }
+ }
+}
+
+impl Default for Builder {
+ fn default() -> Self {
+ Self { num_workers: 2 }
+ }
+}
+
+pub trait EngineBuildContext: Clone + Default {
+ type WalsOpener: WalsOpener;
+
+ fn wals_opener(&self) -> Self::WalsOpener;
+ fn config(&self) -> Config;
+ fn open_method(&self) -> OpenTablesMethod;
+}
+
+pub struct RocksDBEngineBuildContext {
+ config: Config,
+ open_method: OpenTablesMethod,
+}
+
+impl RocksDBEngineBuildContext {
+ pub fn new(mode: RecoverMode, open_method: OpenTablesMethod) -> Self {
+ let mut context = Self::default();
+ context.config.recover_mode = mode;
+ context.open_method = open_method;
+
+ context
+ }
+}
+
+impl Default for RocksDBEngineBuildContext {
+ fn default() -> Self {
+ let dir = tempfile::tempdir().unwrap();
+
+ let config = Config {
+ storage: StorageOptions {
+ mem_cache_capacity: ReadableSize::mb(0),
+ mem_cache_partition_bits: 0,
+ disk_cache_dir: "".to_string(),
+ disk_cache_capacity: ReadableSize::mb(0),
+ disk_cache_page_size: ReadableSize::mb(0),
+ disk_cache_partition_bits: 0,
+ object_store: ObjectStoreOptions::Local(LocalOptions {
+ data_dir: dir.path().to_str().unwrap().to_string(),
+ }),
+ },
+ wal: WalConfig {
+ storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
+ data_dir: dir.path().to_str().unwrap().to_string(),
+ ..Default::default()
+ })),
+ disable_data: false,
+ },
+ ..Default::default()
+ };
+
+ Self {
+ config,
+ open_method: OpenTablesMethod::WithOpenTable,
+ }
+ }
+}
+
+impl Clone for RocksDBEngineBuildContext {
+ fn clone(&self) -> Self {
+ let mut config = self.config.clone();
+
+ let dir = tempfile::tempdir().unwrap();
+ let storage = StorageOptions {
+ mem_cache_capacity: ReadableSize::mb(0),
+ mem_cache_partition_bits: 0,
+ disk_cache_dir: "".to_string(),
+ disk_cache_capacity: ReadableSize::mb(0),
+ disk_cache_page_size: ReadableSize::mb(0),
+ disk_cache_partition_bits: 0,
+ object_store: ObjectStoreOptions::Local(LocalOptions {
+ data_dir: dir.path().to_str().unwrap().to_string(),
+ }),
+ };
+
+ config.storage = storage;
+ config.wal = WalConfig {
+ storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
+ data_dir: dir.path().to_str().unwrap().to_string(),
+ ..Default::default()
+ })),
+ disable_data: false,
+ };
+ Self {
+ config,
+ open_method: self.open_method,
+ }
+ }
+}
+
+impl EngineBuildContext for RocksDBEngineBuildContext {
+ type WalsOpener = RocksDBWalsOpener;
+
+ fn wals_opener(&self) -> Self::WalsOpener {
+ RocksDBWalsOpener
+ }
+
+ fn config(&self) -> Config {
+ self.config.clone()
+ }
+
+ fn open_method(&self) -> OpenTablesMethod {
+ self.open_method
+ }
+}
+
+pub struct TestContext<T> {
+ config: Config,
+ wals_opener: T,
+ runtimes: Arc<EngineRuntimes>,
+ engine: Option<TableEngineRef>,
+ opened_wals: Option<OpenedWals>,
+ schema_id: SchemaId,
+ last_table_seq: u32,
+
+ name_to_tables: HashMap<String, TableRef>,
+}
+
+impl TestEnv {
+ pub fn builder() -> Builder {
+ Builder::default()
+ }
+
+ pub fn new_context<T: EngineBuildContext>(
+ &self,
+ build_context: &T,
+ ) -> TestContext<T::WalsOpener> {
+ let config = build_context.config();
+ let wals_opener = build_context.wals_opener();
+
+ TestContext {
+ config,
+ wals_opener,
+ runtimes: self.runtimes.clone(),
+ engine: None,
+ opened_wals: None,
+ schema_id: SchemaId::from_u32(100),
+ last_table_seq: 1,
+ name_to_tables: HashMap::new(),
+ }
+ }
+
+ pub fn block_on<F: Future>(&self, future: F) -> F::Output {
+ self.runtimes.default_runtime.block_on(future)
+ }
+}
+
+impl<T: WalsOpener> TestContext<T> {
+ pub async fn open(&mut self) {
+ let opened_wals = if let Some(opened_wals) = self.opened_wals.take() {
+ opened_wals
+ } else {
+ self.wals_opener
+ .open_wals(
+ &self.config.wal,
+ WalRuntimes {
+ read_runtime:
self.runtimes.read_runtime.high().clone(),
+ write_runtime: self.runtimes.write_runtime.clone(),
+ default_runtime: self.runtimes.default_runtime.clone(),
+ },
+ )
+ .await
+ .unwrap()
+ };
+
+ let engine_builder = EngineBuilder {
+ config: &self.config,
+ engine_runtimes: self.runtimes.clone(),
+ opened_wals: opened_wals.clone(),
+ };
+ self.opened_wals = Some(opened_wals);
+
+ let TableEngineContext { table_engine, .. } =
engine_builder.build().await.unwrap();
+ self.engine = Some(table_engine);
+ }
+
+ pub async fn create_fixed_schema_table(&mut self, table_name: &str) ->
FixedSchemaTable {
+ let fixed_schema_table = FixedSchemaTable::builder()
+ .schema_id(self.schema_id)
+ .table_name(table_name.to_string())
+ .table_id(self.next_table_id())
+ .ttl("7d".parse::<ReadableDuration>().unwrap())
+ .build_fixed();
+
+ self.create_table(fixed_schema_table.create_request().clone())
+ .await;
+
+ fixed_schema_table
+ }
+
+ fn next_table_id(&mut self) -> TableId {
+ self.last_table_seq += 1;
+ table::new_table_id(2, self.last_table_seq)
+ }
+
+ async fn create_table(&mut self, create_request: CreateTableRequest) {
+ let table_name = create_request.params.table_name.clone();
+ let table = self.engine().create_table(create_request).await.unwrap();
+
+ self.name_to_tables.insert(table_name.to_string(), table);
+ }
+
+ #[inline]
+ pub fn engine(&self) -> &TableEngineRef {
+ self.engine.as_ref().unwrap()
+ }
+
+ pub async fn write_to_table(&self, table_name: &str, row_group: RowGroup) {
+ let table = self.table(table_name);
+
+ table.write(WriteRequest { row_group }).await.unwrap();
+ }
+
+ pub fn table(&self, table_name: &str) -> TableRef {
+ self.name_to_tables.get(table_name).cloned().unwrap()
+ }
+
+ pub async fn read_table(
+ &self,
+ table_name: &str,
+ read_request: ReadRequest,
+ ) -> Vec<RecordBatch> {
+ let table = self.table(table_name);
+
+ let mut stream = table.read(read_request).await.unwrap();
+ let mut record_batches = Vec::new();
+ while let Some(batch) = stream.next().await {
+ let batch = batch.unwrap();
+
+ record_batches.push(batch);
+ }
+
+ record_batches
+ }
+
+ pub async fn partitioned_read_table(
+ &self,
+ table_name: &str,
+ read_request: ReadRequest,
+ ) -> Vec<RecordBatch> {
+ let table = self.table(table_name);
+
+ let streams = table.partitioned_read(read_request).await.unwrap();
+ let mut record_batches = Vec::new();
+
+ for mut stream in streams.streams {
+ while let Some(batch) = stream.next().await {
+ let batch = batch.unwrap();
+
+ record_batches.push(batch);
+ }
+ }
+
+ record_batches
+ }
+
+ pub async fn reopen_with_tables(&mut self, tables: &[&str]) {
+ let table_infos: Vec<_> = tables
+ .iter()
+ .map(|name| {
+ let table_id = self.name_to_tables.get(*name).unwrap().id();
+ (table_id, *name)
+ })
+ .collect();
+ {
+ // Close all tables.
+ self.name_to_tables.clear();
+
+ // Close engine.
+ let engine = self.engine.take().unwrap();
+ engine.close().await.unwrap();
+ }
+
+ self.open().await;
+
+ self.open_tables_of_shard(table_infos, DEFAULT_SHARD_ID)
+ .await;
+ }
+
+ async fn open_tables_of_shard(&mut self, table_infos: Vec<(TableId,
&str)>, shard_id: ShardId) {
+ let table_defs = table_infos
+ .into_iter()
+ .map(|table| TableDef {
+ catalog_name: "horaedb".to_string(),
+ schema_name: "public".to_string(),
+ schema_id: self.schema_id,
+ id: table.0,
+ name: table.1.to_string(),
+ })
+ .collect();
+
+ let open_shard_request = OpenShardRequest {
+ shard_id,
+ table_defs,
+ engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(),
+ };
+
+ let tables = self
+ .engine()
+ .open_shard(open_shard_request)
+ .await
+ .unwrap()
+ .into_values()
+ .map(|result| result.unwrap().unwrap());
+
+ for table in tables {
+ self.name_to_tables.insert(table.name().to_string(), table);
+ }
+ }
+
+ pub fn name_to_tables(&self) -> &HashMap<String, TableRef> {
+ &self.name_to_tables
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]