This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cee69fa feat: add wal replay benchmark (#1511)
0cee69fa is described below

commit 0cee69fa4dad05cd1d188497daf7a5d6072a1d76
Author: MianChen <[email protected]>
AuthorDate: Wed Apr 17 18:51:09 2024 +0800

    feat: add wal replay benchmark (#1511)
---
 Cargo.lock                         |   1 +
 src/benchmarks/Cargo.toml          |   2 +
 src/benchmarks/bench.toml          |   4 +
 src/benchmarks/benches/bench.rs    |  22 +-
 src/benchmarks/config/bench.toml   |   5 +
 src/benchmarks/src/config.rs       |   8 +
 src/benchmarks/src/lib.rs          |   2 +
 src/benchmarks/src/replay_bench.rs |  97 +++++++++
 src/benchmarks/src/table.rs        | 246 +++++++++++++++++++++
 src/benchmarks/src/util.rs         | 423 ++++++++++++++++++++++++++++++++++++-
 10 files changed, 804 insertions(+), 6 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 24230bde..c8e331b0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -939,6 +939,7 @@ dependencies = [
  "snafu 0.6.10",
  "table_engine",
  "table_kv",
+ "tempfile",
  "time_ext",
  "tokio",
  "toml_ext",
diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml
index 843e2fc7..031d0723 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/benchmarks/Cargo.toml
@@ -54,6 +54,7 @@ size_ext = { workspace = true }
 snafu = { workspace = true }
 table_engine = { workspace = true }
 table_kv = { workspace = true }
+tempfile = { workspace = true }
 time_ext = { workspace = true }
 tokio = { workspace = true }
 toml_ext = { workspace = true }
@@ -63,6 +64,7 @@ zstd = { workspace = true }
 
 [dev-dependencies]
 criterion = { workspace = true }
+tempfile = { workspace = true }
 
 [[bench]]
 name = "bench"
diff --git a/src/benchmarks/bench.toml b/src/benchmarks/bench.toml
index 9e9e0331..b76f779f 100644
--- a/src/benchmarks/bench.toml
+++ b/src/benchmarks/bench.toml
@@ -64,3 +64,7 @@ bench_measurement_time = "60s"
 bench_sample_size = 60
 batch_size = 512
 value_size = 1024
+
+[replay_bench]
+bench_measurement_time = "3s"
+bench_sample_size = 10
\ No newline at end of file
diff --git a/src/benchmarks/benches/bench.rs b/src/benchmarks/benches/bench.rs
index 7f5f5347..cb5d76ed 100644
--- a/src/benchmarks/benches/bench.rs
+++ b/src/benchmarks/benches/bench.rs
@@ -17,13 +17,14 @@
 
 //! Benchmarks
 
-use std::sync::Once;
+use std::{cell::RefCell, sync::Once};
 
 use benchmarks::{
     config::{self, BenchConfig},
     merge_memtable_bench::MergeMemTableBench,
     merge_sst_bench::MergeSstBench,
     parquet_bench::ParquetBench,
+    replay_bench::ReplayBench,
     scan_memtable_bench::ScanMemTableBench,
     sst_bench::SstBench,
     wal_write_bench::WalWriteBench,
@@ -208,6 +209,24 @@ fn bench_wal_write(c: &mut Criterion) {
     group.finish();
 }
 
+fn bench_replay_iter(b: &mut Bencher<'_>, bench: &RefCell<ReplayBench>) {
+    let mut bench = bench.borrow_mut();
+    b.iter(|| bench.run_bench())
+}
+
+fn bench_replay(c: &mut Criterion) {
+    let config = init_bench();
+
+    let mut group = c.benchmark_group("replay");
+
+    group.measurement_time(config.replay_bench.bench_measurement_time.0);
+    group.sample_size(config.replay_bench.bench_sample_size);
+
+    let bench = RefCell::new(ReplayBench::new(config.replay_bench));
+    group.bench_with_input(BenchmarkId::new("replay", 0), &bench, 
bench_replay_iter);
+    group.finish();
+}
+
 criterion_group!(
     name = benches;
     config = Criterion::default().with_profiler(PProfProfiler::new(100, 
Output::Flamegraph(None)));
@@ -217,6 +236,7 @@ criterion_group!(
     bench_scan_memtable,
     bench_merge_memtable,
     bench_wal_write,
+    bench_replay,
 );
 
 criterion_main!(benches);
diff --git a/src/benchmarks/config/bench.toml b/src/benchmarks/config/bench.toml
index 41a08af7..ad66fdea 100644
--- a/src/benchmarks/config/bench.toml
+++ b/src/benchmarks/config/bench.toml
@@ -71,3 +71,8 @@ bench_measurement_time = "60s"
 bench_sample_size = 60
 batch_size = 512
 value_size = 1024
+
+[replay_bench]
+bench_measurement_time = "3s"
+bench_sample_size = 10
+batch_size = 10000
diff --git a/src/benchmarks/src/config.rs b/src/benchmarks/src/config.rs
index b90a7cb5..493eb7c6 100644
--- a/src/benchmarks/src/config.rs
+++ b/src/benchmarks/src/config.rs
@@ -38,6 +38,7 @@ pub struct BenchConfig {
     pub scan_memtable_bench: ScanMemTableBenchConfig,
     pub merge_memtable_bench: MergeMemTableBenchConfig,
     pub wal_write_bench: WalWriteBenchConfig,
+    pub replay_bench: ReplayConfig,
 }
 
 // TODO(yingwen): Maybe we can use layze static to load config first.
@@ -147,3 +148,10 @@ pub struct WalWriteBenchConfig {
     pub batch_size: usize,
     pub value_size: usize,
 }
+
+#[derive(Deserialize)]
+pub struct ReplayConfig {
+    pub bench_measurement_time: ReadableDuration,
+    pub bench_sample_size: usize,
+    pub batch_size: usize,
+}
diff --git a/src/benchmarks/src/lib.rs b/src/benchmarks/src/lib.rs
index 21142dd2..ffc098c5 100644
--- a/src/benchmarks/src/lib.rs
+++ b/src/benchmarks/src/lib.rs
@@ -23,9 +23,11 @@ pub mod config;
 pub mod merge_memtable_bench;
 pub mod merge_sst_bench;
 pub mod parquet_bench;
+pub mod replay_bench;
 pub mod scan_memtable_bench;
 pub mod sst_bench;
 pub mod sst_tools;
+pub mod table;
 pub mod util;
 pub mod wal_write_bench;
 
diff --git a/src/benchmarks/src/replay_bench.rs 
b/src/benchmarks/src/replay_bench.rs
new file mode 100644
index 00000000..bf2c9a88
--- /dev/null
+++ b/src/benchmarks/src/replay_bench.rs
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Replay bench.
+
+use std::sync::Arc;
+
+use analytic_engine::RecoverMode;
+use runtime::Runtime;
+use util::{OpenTablesMethod, RocksDBEngineBuildContext, TestContext, TestEnv};
+use wal::rocksdb_impl::manager::RocksDBWalsOpener;
+
+use crate::{config::ReplayConfig, table::FixedSchemaTable, util};
+
+pub struct ReplayBench {
+    runtime: Arc<Runtime>,
+    test_ctx: TestContext<RocksDBWalsOpener>,
+    table: FixedSchemaTable,
+    batch_size: usize,
+}
+
+impl ReplayBench {
+    pub fn new(config: ReplayConfig) -> Self {
+        let runtime = util::new_runtime(1);
+        let engine_context = RocksDBEngineBuildContext::new(
+            RecoverMode::TableBased,
+            OpenTablesMethod::WithOpenShard,
+        );
+        let env: TestEnv = TestEnv::builder().build();
+
+        let (test_ctx, fixed_schema_table) = env.block_on(async {
+            let mut test_ctx = env.new_context(&engine_context);
+            test_ctx.open().await;
+
+            let fixed_schema_table = test_ctx
+                .create_fixed_schema_table("test_replay_table1")
+                .await;
+            let _ = test_ctx
+                .create_fixed_schema_table("test_replay_table2")
+                .await;
+            let _ = test_ctx
+                .create_fixed_schema_table("test_replay_table3")
+                .await;
+
+            (test_ctx, fixed_schema_table)
+        });
+
+        ReplayBench {
+            runtime: Arc::new(runtime),
+            test_ctx,
+            table: fixed_schema_table,
+            batch_size: config.batch_size,
+        }
+    }
+
+    pub fn run_bench(&mut self) {
+        self.runtime.block_on(async {
+            self.table.prepare_write_requests(self.batch_size);
+            let rows = self.table.row_tuples();
+
+            // Write data to table.
+            let mut table_names = Vec::new();
+            for (table_name, _) in self.test_ctx.name_to_tables().iter() {
+                let row_group = self.table.rows_to_row_group(&rows);
+                self.test_ctx
+                    .write_to_table(table_name.as_str(), row_group)
+                    .await;
+                table_names.push(table_name.clone());
+            }
+
+            // Reopen db.
+            self.test_ctx
+                .reopen_with_tables(
+                    table_names
+                        .iter()
+                        .map(|s| s.as_str())
+                        .collect::<Vec<_>>()
+                        .as_slice(),
+                )
+                .await;
+        });
+    }
+}
diff --git a/src/benchmarks/src/table.rs b/src/benchmarks/src/table.rs
new file mode 100644
index 00000000..31df4234
--- /dev/null
+++ b/src/benchmarks/src/table.rs
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utils to create table.
+
+use std::collections::HashMap;
+
+use common_types::{
+    column_schema,
+    datum::{Datum, DatumKind},
+    row::{Row, RowGroup},
+    schema::{self, Schema},
+    table::DEFAULT_SHARD_ID,
+    time::Timestamp,
+};
+use table_engine::{
+    self,
+    engine::{CreateTableParams, CreateTableRequest, TableState},
+    table::{SchemaId, TableId, TableSeq},
+};
+use time_ext::ReadableDuration;
+
+use crate::util::start_ms;
+
+pub fn new_row_6<C0, C1, C2, C3, C4, C5>(data: (C0, C1, C2, C3, C4, C5)) -> Row
+where
+    C0: Into<Datum>,
+    C1: Into<Datum>,
+    C2: Into<Datum>,
+    C3: Into<Datum>,
+    C4: Into<Datum>,
+    C5: Into<Datum>,
+{
+    let cols = vec![
+        data.0.into(),
+        data.1.into(),
+        data.2.into(),
+        data.3.into(),
+        data.4.into(),
+        data.5.into(),
+    ];
+
+    Row::from_datums(cols)
+}
+
+pub type WriteRequestTuple = (String, Timestamp, String, f64, f64, String);
+pub type RowTuple<'a> = (&'a str, Timestamp, &'a str, f64, f64, &'a str);
+
+pub fn new_table_id(schema_id: u16, table_seq: u32) -> TableId {
+    TableId::with_seq(SchemaId::from(schema_id), 
TableSeq::from(table_seq)).unwrap()
+}
+
+pub struct RowTupleGenerator {}
+
+pub struct FixedSchemaTable {
+    create_request: CreateTableRequest,
+    write_requests: Vec<WriteRequestTuple>,
+}
+
+impl FixedSchemaTable {
+    pub fn builder() -> Builder {
+        Builder::default()
+    }
+
+    fn default_schema() -> Schema {
+        Self::default_schema_builder().build().unwrap()
+    }
+
+    pub fn default_schema_builder() -> schema::Builder {
+        create_schema_builder(
+            // Key columns
+            &[("key", DatumKind::String), ("ts", DatumKind::Timestamp)],
+            // Normal columns
+            &[
+                ("string_tag", DatumKind::String),
+                ("double_field1", DatumKind::Double),
+                ("double_field2", DatumKind::Double),
+                ("string_field2", DatumKind::String),
+            ],
+        )
+    }
+
+    #[inline]
+    pub fn table_id(&self) -> TableId {
+        self.create_request.table_id
+    }
+
+    #[inline]
+    pub fn create_request(&self) -> &CreateTableRequest {
+        &self.create_request
+    }
+
+    fn new_row(data: RowTuple) -> Row {
+        new_row_6(data)
+    }
+
+    pub fn rows_to_row_group(&self, data: &[RowTuple]) -> RowGroup {
+        let rows = data
+            .iter()
+            .copied()
+            .map(FixedSchemaTable::new_row)
+            .collect();
+
+        self.new_row_group(rows)
+    }
+
+    fn new_row_group(&self, rows: Vec<Row>) -> RowGroup {
+        RowGroup::try_new(self.create_request.params.table_schema.clone(), 
rows).unwrap()
+    }
+
+    pub fn prepare_write_requests(&mut self, batch_size: usize) {
+        let start_ms = start_ms();
+        self.write_requests.clear();
+        (0..batch_size).for_each(|idx| {
+            self.write_requests.push((
+                format!("key_{idx}"),
+                Timestamp::new(start_ms + idx as i64),
+                format!("tag1_{idx}"),
+                11.0,
+                110.0,
+                format!("tag2_{idx}"),
+            ))
+        });
+    }
+
+    pub fn row_tuples(&self) -> Vec<RowTuple> {
+        self.write_requests
+            .iter()
+            .map(|x| (x.0.as_str(), x.1, x.2.as_str(), x.3, x.4, x.5.as_str()))
+            .collect()
+    }
+}
+
+#[must_use]
+pub struct Builder {
+    create_request: CreateTableRequest,
+}
+
+impl Builder {
+    pub fn schema_id(mut self, schema_id: SchemaId) -> Self {
+        self.create_request.schema_id = schema_id;
+        self
+    }
+
+    pub fn table_name(mut self, table_name: String) -> Self {
+        self.create_request.params.table_name = table_name;
+        self
+    }
+
+    pub fn table_id(mut self, table_id: TableId) -> Self {
+        self.create_request.table_id = table_id;
+        self
+    }
+
+    pub fn enable_ttl(mut self, enable_ttl: bool) -> Self {
+        self.create_request.params.table_options.insert(
+            common_types::OPTION_KEY_ENABLE_TTL.to_string(),
+            enable_ttl.to_string(),
+        );
+        self
+    }
+
+    pub fn ttl(mut self, duration: ReadableDuration) -> Self {
+        self.create_request
+            .params
+            .table_options
+            .insert(common_types::TTL.to_string(), duration.to_string());
+        self
+    }
+
+    pub fn build_fixed(self) -> FixedSchemaTable {
+        FixedSchemaTable {
+            create_request: self.create_request,
+            write_requests: Vec::new(),
+        }
+    }
+}
+
+impl Default for Builder {
+    fn default() -> Self {
+        let params = CreateTableParams {
+            catalog_name: "horaedb".to_string(),
+            schema_name: "public".to_string(),
+            table_name: "test_table".to_string(),
+            table_schema: FixedSchemaTable::default_schema(),
+            partition_info: None,
+            engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(),
+            table_options: HashMap::new(),
+        };
+
+        Self {
+            create_request: CreateTableRequest {
+                params,
+                schema_id: SchemaId::from_u32(2),
+                table_id: new_table_id(2, 1),
+                state: TableState::Stable,
+                shard_id: DEFAULT_SHARD_ID,
+            },
+        }
+    }
+}
+
+// Format of input slice: &[ ( column name, column type ) ]
+pub fn create_schema_builder(
+    key_tuples: &[(&str, DatumKind)],
+    normal_tuples: &[(&str, DatumKind)],
+) -> schema::Builder {
+    assert!(!key_tuples.is_empty());
+
+    let mut schema_builder = schema::Builder::with_capacity(key_tuples.len() + 
normal_tuples.len())
+        .auto_increment_column_id(true)
+        .primary_key_indexes((0..key_tuples.len()).collect());
+
+    for tuple in key_tuples {
+        // Key column is not nullable.
+        let column_schema = column_schema::Builder::new(tuple.0.to_string(), 
tuple.1)
+            .is_nullable(false)
+            .build()
+            .expect("Should succeed to build key column schema");
+        schema_builder = schema_builder.add_key_column(column_schema).unwrap();
+    }
+
+    for tuple in normal_tuples {
+        let column_schema = column_schema::Builder::new(tuple.0.to_string(), 
tuple.1)
+            .is_nullable(true)
+            .build()
+            .expect("Should succeed to build normal column schema");
+        schema_builder = 
schema_builder.add_normal_column(column_schema).unwrap();
+    }
+
+    schema_builder
+}
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index 188081d8..cb6d8de9 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -17,10 +17,11 @@
 
 //! Utilities.
 
-use std::sync::Arc;
+use std::{collections::HashMap, future::Future, sync::Arc};
 
 use analytic_engine::{
     memtable::{key::KeySequence, MemTableRef, PutContext},
+    setup::{EngineBuilder, TableEngineContext},
     space::SpaceId,
     sst::{
         factory::{
@@ -34,19 +35,42 @@ use analytic_engine::{
     },
     table::sst_util,
     table_options::StorageFormat,
+    Config, RecoverMode,
 };
 use bytes_ext::{BufMut, SafeBufMut};
 use common_types::{
     projected_schema::{ProjectedSchema, RowProjectorBuilder},
+    record_batch::RecordBatch,
+    row::RowGroup,
     schema::{IndexInWriterSchema, Schema},
+    table::{ShardId, DEFAULT_SHARD_ID},
+    time::Timestamp,
 };
+use futures::stream::StreamExt;
 use macros::define_result;
-use object_store::{ObjectStoreRef, Path};
+use object_store::{
+    config::{LocalOptions, ObjectStoreOptions, StorageOptions},
+    ObjectStoreRef, Path,
+};
 use parquet::file::footer;
-use runtime::Runtime;
+use runtime::{PriorityRuntime, Runtime};
+use size_ext::ReadableSize;
 use snafu::{ResultExt, Snafu};
-use table_engine::{predicate::Predicate, table::TableId};
-use wal::log_batch::Payload;
+use table_engine::{
+    engine::{CreateTableRequest, EngineRuntimes, OpenShardRequest, TableDef, 
TableEngineRef},
+    predicate::Predicate,
+    table::{ReadRequest, SchemaId, TableId, TableRef, WriteRequest},
+};
+use tempfile::TempDir;
+use time_ext::ReadableDuration;
+use wal::{
+    config::{Config as WalConfig, StorageConfig},
+    log_batch::Payload,
+    manager::{OpenedWals, WalRuntimes, WalsOpener},
+    rocksdb_impl::{config::RocksDBStorageConfig, manager::RocksDBWalsOpener},
+};
+
+use crate::{table, table::FixedSchemaTable};
 
 #[derive(Debug, Snafu)]
 pub enum Error {
@@ -248,3 +272,392 @@ impl<'a> From<&'a Vec<u8>> for WritePayload<'a> {
         Self(data)
     }
 }
+
+const DAY_MS: i64 = 24 * 60 * 60 * 1000;
+/// 3 days ago.
+pub fn start_ms() -> i64 {
+    Timestamp::now().as_i64() - 3 * DAY_MS
+}
+#[derive(Clone, Copy, Debug)]
+pub enum OpenTablesMethod {
+    WithOpenTable,
+    WithOpenShard,
+}
+
+pub struct TestEnv {
+    _dir: TempDir,
+    pub config: Config,
+    pub runtimes: Arc<EngineRuntimes>,
+}
+
+pub struct Builder {
+    num_workers: usize,
+}
+
+impl Builder {
+    pub fn build(self) -> TestEnv {
+        let dir = tempfile::tempdir().unwrap();
+
+        let config = Config {
+            storage: StorageOptions {
+                mem_cache_capacity: ReadableSize::mb(0),
+                mem_cache_partition_bits: 0,
+                disk_cache_dir: "".to_string(),
+                disk_cache_capacity: ReadableSize::mb(0),
+                disk_cache_page_size: ReadableSize::mb(0),
+                disk_cache_partition_bits: 0,
+                object_store: ObjectStoreOptions::Local(LocalOptions {
+                    data_dir: dir.path().to_str().unwrap().to_string(),
+                }),
+            },
+            wal: WalConfig {
+                storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
+                    data_dir: dir.path().to_str().unwrap().to_string(),
+                    ..Default::default()
+                })),
+                disable_data: false,
+            },
+            ..Default::default()
+        };
+
+        let runtime = Arc::new(
+            runtime::Builder::default()
+                .worker_threads(self.num_workers)
+                .enable_all()
+                .build()
+                .unwrap(),
+        );
+
+        TestEnv {
+            _dir: dir,
+            config,
+            runtimes: Arc::new(EngineRuntimes {
+                read_runtime: PriorityRuntime::new(runtime.clone(), 
runtime.clone()),
+                write_runtime: runtime.clone(),
+                meta_runtime: runtime.clone(),
+                compact_runtime: runtime.clone(),
+                default_runtime: runtime.clone(),
+                io_runtime: runtime,
+            }),
+        }
+    }
+}
+
+impl Default for Builder {
+    fn default() -> Self {
+        Self { num_workers: 2 }
+    }
+}
+
+pub trait EngineBuildContext: Clone + Default {
+    type WalsOpener: WalsOpener;
+
+    fn wals_opener(&self) -> Self::WalsOpener;
+    fn config(&self) -> Config;
+    fn open_method(&self) -> OpenTablesMethod;
+}
+
+pub struct RocksDBEngineBuildContext {
+    config: Config,
+    open_method: OpenTablesMethod,
+}
+
+impl RocksDBEngineBuildContext {
+    pub fn new(mode: RecoverMode, open_method: OpenTablesMethod) -> Self {
+        let mut context = Self::default();
+        context.config.recover_mode = mode;
+        context.open_method = open_method;
+
+        context
+    }
+}
+
+impl Default for RocksDBEngineBuildContext {
+    fn default() -> Self {
+        let dir = tempfile::tempdir().unwrap();
+
+        let config = Config {
+            storage: StorageOptions {
+                mem_cache_capacity: ReadableSize::mb(0),
+                mem_cache_partition_bits: 0,
+                disk_cache_dir: "".to_string(),
+                disk_cache_capacity: ReadableSize::mb(0),
+                disk_cache_page_size: ReadableSize::mb(0),
+                disk_cache_partition_bits: 0,
+                object_store: ObjectStoreOptions::Local(LocalOptions {
+                    data_dir: dir.path().to_str().unwrap().to_string(),
+                }),
+            },
+            wal: WalConfig {
+                storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
+                    data_dir: dir.path().to_str().unwrap().to_string(),
+                    ..Default::default()
+                })),
+                disable_data: false,
+            },
+            ..Default::default()
+        };
+
+        Self {
+            config,
+            open_method: OpenTablesMethod::WithOpenTable,
+        }
+    }
+}
+
+impl Clone for RocksDBEngineBuildContext {
+    fn clone(&self) -> Self {
+        let mut config = self.config.clone();
+
+        let dir = tempfile::tempdir().unwrap();
+        let storage = StorageOptions {
+            mem_cache_capacity: ReadableSize::mb(0),
+            mem_cache_partition_bits: 0,
+            disk_cache_dir: "".to_string(),
+            disk_cache_capacity: ReadableSize::mb(0),
+            disk_cache_page_size: ReadableSize::mb(0),
+            disk_cache_partition_bits: 0,
+            object_store: ObjectStoreOptions::Local(LocalOptions {
+                data_dir: dir.path().to_str().unwrap().to_string(),
+            }),
+        };
+
+        config.storage = storage;
+        config.wal = WalConfig {
+            storage: StorageConfig::RocksDB(Box::new(RocksDBStorageConfig {
+                data_dir: dir.path().to_str().unwrap().to_string(),
+                ..Default::default()
+            })),
+            disable_data: false,
+        };
+        Self {
+            config,
+            open_method: self.open_method,
+        }
+    }
+}
+
+impl EngineBuildContext for RocksDBEngineBuildContext {
+    type WalsOpener = RocksDBWalsOpener;
+
+    fn wals_opener(&self) -> Self::WalsOpener {
+        RocksDBWalsOpener
+    }
+
+    fn config(&self) -> Config {
+        self.config.clone()
+    }
+
+    fn open_method(&self) -> OpenTablesMethod {
+        self.open_method
+    }
+}
+
+pub struct TestContext<T> {
+    config: Config,
+    wals_opener: T,
+    runtimes: Arc<EngineRuntimes>,
+    engine: Option<TableEngineRef>,
+    opened_wals: Option<OpenedWals>,
+    schema_id: SchemaId,
+    last_table_seq: u32,
+
+    name_to_tables: HashMap<String, TableRef>,
+}
+
+impl TestEnv {
+    pub fn builder() -> Builder {
+        Builder::default()
+    }
+
+    pub fn new_context<T: EngineBuildContext>(
+        &self,
+        build_context: &T,
+    ) -> TestContext<T::WalsOpener> {
+        let config = build_context.config();
+        let wals_opener = build_context.wals_opener();
+
+        TestContext {
+            config,
+            wals_opener,
+            runtimes: self.runtimes.clone(),
+            engine: None,
+            opened_wals: None,
+            schema_id: SchemaId::from_u32(100),
+            last_table_seq: 1,
+            name_to_tables: HashMap::new(),
+        }
+    }
+
+    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
+        self.runtimes.default_runtime.block_on(future)
+    }
+}
+
+impl<T: WalsOpener> TestContext<T> {
+    pub async fn open(&mut self) {
+        let opened_wals = if let Some(opened_wals) = self.opened_wals.take() {
+            opened_wals
+        } else {
+            self.wals_opener
+                .open_wals(
+                    &self.config.wal,
+                    WalRuntimes {
+                        read_runtime: 
self.runtimes.read_runtime.high().clone(),
+                        write_runtime: self.runtimes.write_runtime.clone(),
+                        default_runtime: self.runtimes.default_runtime.clone(),
+                    },
+                )
+                .await
+                .unwrap()
+        };
+
+        let engine_builder = EngineBuilder {
+            config: &self.config,
+            engine_runtimes: self.runtimes.clone(),
+            opened_wals: opened_wals.clone(),
+        };
+        self.opened_wals = Some(opened_wals);
+
+        let TableEngineContext { table_engine, .. } = 
engine_builder.build().await.unwrap();
+        self.engine = Some(table_engine);
+    }
+
+    pub async fn create_fixed_schema_table(&mut self, table_name: &str) -> 
FixedSchemaTable {
+        let fixed_schema_table = FixedSchemaTable::builder()
+            .schema_id(self.schema_id)
+            .table_name(table_name.to_string())
+            .table_id(self.next_table_id())
+            .ttl("7d".parse::<ReadableDuration>().unwrap())
+            .build_fixed();
+
+        self.create_table(fixed_schema_table.create_request().clone())
+            .await;
+
+        fixed_schema_table
+    }
+
+    fn next_table_id(&mut self) -> TableId {
+        self.last_table_seq += 1;
+        table::new_table_id(2, self.last_table_seq)
+    }
+
+    async fn create_table(&mut self, create_request: CreateTableRequest) {
+        let table_name = create_request.params.table_name.clone();
+        let table = self.engine().create_table(create_request).await.unwrap();
+
+        self.name_to_tables.insert(table_name.to_string(), table);
+    }
+
+    #[inline]
+    pub fn engine(&self) -> &TableEngineRef {
+        self.engine.as_ref().unwrap()
+    }
+
+    pub async fn write_to_table(&self, table_name: &str, row_group: RowGroup) {
+        let table = self.table(table_name);
+
+        table.write(WriteRequest { row_group }).await.unwrap();
+    }
+
+    pub fn table(&self, table_name: &str) -> TableRef {
+        self.name_to_tables.get(table_name).cloned().unwrap()
+    }
+
+    pub async fn read_table(
+        &self,
+        table_name: &str,
+        read_request: ReadRequest,
+    ) -> Vec<RecordBatch> {
+        let table = self.table(table_name);
+
+        let mut stream = table.read(read_request).await.unwrap();
+        let mut record_batches = Vec::new();
+        while let Some(batch) = stream.next().await {
+            let batch = batch.unwrap();
+
+            record_batches.push(batch);
+        }
+
+        record_batches
+    }
+
+    pub async fn partitioned_read_table(
+        &self,
+        table_name: &str,
+        read_request: ReadRequest,
+    ) -> Vec<RecordBatch> {
+        let table = self.table(table_name);
+
+        let streams = table.partitioned_read(read_request).await.unwrap();
+        let mut record_batches = Vec::new();
+
+        for mut stream in streams.streams {
+            while let Some(batch) = stream.next().await {
+                let batch = batch.unwrap();
+
+                record_batches.push(batch);
+            }
+        }
+
+        record_batches
+    }
+
+    pub async fn reopen_with_tables(&mut self, tables: &[&str]) {
+        let table_infos: Vec<_> = tables
+            .iter()
+            .map(|name| {
+                let table_id = self.name_to_tables.get(*name).unwrap().id();
+                (table_id, *name)
+            })
+            .collect();
+        {
+            // Close all tables.
+            self.name_to_tables.clear();
+
+            // Close engine.
+            let engine = self.engine.take().unwrap();
+            engine.close().await.unwrap();
+        }
+
+        self.open().await;
+
+        self.open_tables_of_shard(table_infos, DEFAULT_SHARD_ID)
+            .await;
+    }
+
+    async fn open_tables_of_shard(&mut self, table_infos: Vec<(TableId, 
&str)>, shard_id: ShardId) {
+        let table_defs = table_infos
+            .into_iter()
+            .map(|table| TableDef {
+                catalog_name: "horaedb".to_string(),
+                schema_name: "public".to_string(),
+                schema_id: self.schema_id,
+                id: table.0,
+                name: table.1.to_string(),
+            })
+            .collect();
+
+        let open_shard_request = OpenShardRequest {
+            shard_id,
+            table_defs,
+            engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(),
+        };
+
+        let tables = self
+            .engine()
+            .open_shard(open_shard_request)
+            .await
+            .unwrap()
+            .into_values()
+            .map(|result| result.unwrap().unwrap());
+
+        for table in tables {
+            self.name_to_tables.insert(table.name().to_string(), table);
+        }
+    }
+
+    pub fn name_to_tables(&self) -> &HashMap<String, TableRef> {
+        &self.name_to_tables
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to