This is an automated email from the ASF dual-hosted git repository.
xikai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/dev by this push:
new 53df95ac feat: impl layered memtable to reduce duplicated encode
during scan (#1271)
53df95ac is described below
commit 53df95ac507d7658b7e6abb44820a49cc62bf6d4
Author: kamille <[email protected]>
AuthorDate: Mon Jan 8 15:41:20 2024 +0800
feat: impl layered memtable to reduce duplicated encode during scan (#1271)
## Rationale
Conversion from row format in memtable to record batch in datafusion has
been found a cpu bottleneck in production. For reduce the cpu cost, I
impl the layered memtable framework to support gradually conversion
during normal write path(and before flush).
## Detailed Changes
+ Impl layered memtable framework
+ Integrate it into the write path.
## Test Plan
Test by new ut and it.
---
Cargo.lock | 32 +-
Cargo.toml | 2 +-
analytic_engine/src/instance/flush_compaction.rs | 30 +-
analytic_engine/src/lib.rs | 6 +
analytic_engine/src/memtable/columnar/factory.rs | 5 +-
analytic_engine/src/memtable/factory.rs | 9 +-
.../src/memtable/{skiplist => layered}/factory.rs | 41 +-
analytic_engine/src/memtable/layered/iter.rs | 120 ++++
analytic_engine/src/memtable/layered/mod.rs | 729 +++++++++++++++++++++
analytic_engine/src/memtable/mod.rs | 73 ++-
analytic_engine/src/memtable/skiplist/factory.rs | 4 +-
analytic_engine/src/memtable/skiplist/mod.rs | 133 ++--
analytic_engine/src/memtable/test_util.rs | 42 ++
.../src/row_iter/record_batch_stream.rs | 1 +
analytic_engine/src/sst/parquet/async_reader.rs | 19 +-
analytic_engine/src/table/data.rs | 49 +-
analytic_engine/src/table_meta_set_impl.rs | 1 +
analytic_engine/src/table_options.rs | 62 +-
benchmarks/src/scan_memtable_bench.rs | 6 +-
common_types/src/lib.rs | 1 +
common_types/src/projected_schema.rs | 2 +-
common_types/src/record_batch.rs | 27 +-
common_types/src/time.rs | 7 +
system_catalog/src/sys_catalog_table.rs | 5 +
24 files changed, 1254 insertions(+), 152 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index b1f6b823..088009fa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -96,7 +96,7 @@ dependencies = [
"atomic_enum",
"base64 0.13.1",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"codec",
"common_types",
"datafusion",
@@ -1345,7 +1345,7 @@ dependencies = [
[[package]]
name = "ceresdbproto"
version = "1.0.23"
-source =
"git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc#cfdacccebb7c609cb1aac791b73ba9a838d7ade6"
+source =
"git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32#cb36a32b827b3afe88ca04420f7fd21518f15293"
dependencies = [
"prost",
"protoc-bin-vendored",
@@ -1528,7 +1528,7 @@ dependencies = [
"async-trait",
"bytes_ext",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"common_types",
"etcd-client",
"future_ext",
@@ -1606,7 +1606,7 @@ dependencies = [
"arrow 43.0.0",
"arrow_ext",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"chrono",
"datafusion",
"hash_ext",
@@ -2362,7 +2362,7 @@ dependencies = [
"async-recursion",
"async-trait",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"common_types",
"datafusion",
"datafusion-proto",
@@ -3921,7 +3921,7 @@ name = "meta_client"
version = "1.2.6-alpha"
dependencies = [
"async-trait",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"common_types",
"futures 0.3.28",
"generic_error",
@@ -4446,7 +4446,7 @@ version = "1.2.6-alpha"
dependencies = [
"async-trait",
"bytes",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"chrono",
"clru",
"crc",
@@ -5323,7 +5323,7 @@ dependencies = [
"async-trait",
"bytes",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"clru",
"cluster",
"common_types",
@@ -5451,7 +5451,7 @@ dependencies = [
"arrow 43.0.0",
"async-trait",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"chrono",
"cluster",
"codec",
@@ -5765,7 +5765,7 @@ version = "1.2.6-alpha"
dependencies = [
"arrow_ext",
"async-trait",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"common_types",
"futures 0.3.28",
"generic_error",
@@ -5894,7 +5894,7 @@ name = "router"
version = "1.2.6-alpha"
dependencies = [
"async-trait",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"cluster",
"common_types",
"generic_error",
@@ -6269,7 +6269,7 @@ dependencies = [
"async-trait",
"bytes_ext",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"clru",
"cluster",
"common_types",
@@ -6795,7 +6795,7 @@ dependencies = [
"async-trait",
"bytes_ext",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"codec",
"common_types",
"futures 0.3.28",
@@ -6817,7 +6817,7 @@ dependencies = [
"arrow_ext",
"async-trait",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"common_types",
"datafusion",
"datafusion-proto",
@@ -7020,7 +7020,7 @@ dependencies = [
name = "time_ext"
version = "1.2.6-alpha"
dependencies = [
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"chrono",
"common_types",
"macros",
@@ -7672,7 +7672,7 @@ version = "1.2.6-alpha"
dependencies = [
"async-trait",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
"chrono",
"codec",
"common_types",
diff --git a/Cargo.toml b/Cargo.toml
index 1588c237..e20984f6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,7 +94,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
-ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev =
"cfdaccc" }
+ceresdbproto = { git =
"https://github.com/apache/incubator-horaedb-proto.git", rev = "cb36a32" }
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
diff --git a/analytic_engine/src/instance/flush_compaction.rs
b/analytic_engine/src/instance/flush_compaction.rs
index 880eb10c..b051fb04 100644
--- a/analytic_engine/src/instance/flush_compaction.rs
+++ b/analytic_engine/src/instance/flush_compaction.rs
@@ -63,7 +63,7 @@ use crate::{
factory::{self, ColumnStats, ScanOptions, SstWriteOptions},
file::{FileMeta, Level},
meta_data::{SstMetaData, SstMetaReader},
- writer::{MetaData, RecordBatchStream},
+ writer::MetaData,
},
table::{
data::{self, TableData, TableDataRef},
@@ -367,7 +367,7 @@ impl FlushTask {
let mut last_sequence = table_data.last_sequence();
// Switch (freeze) all mutable memtables. And update segment duration
if
// suggestion is returned.
- let mut need_reorder = false;
+ let mut need_reorder = table_data.enable_layered_memtable;
if let Some(suggest_segment_duration) =
current_version.suggest_duration() {
info!(
"Update segment duration, table:{}, table_id:{},
segment_duration:{:?}",
@@ -483,7 +483,9 @@ impl FlushTask {
}
}
for mem in &mems_to_flush.memtables {
- let file = self.dump_normal_memtable(request_id.clone(),
mem).await?;
+ let file = self
+ .dump_normal_memtable(request_id.clone(), mem, need_reorder)
+ .await?;
if let Some(file) = file {
let sst_size = file.size;
files_to_level0.push(AddFile {
@@ -728,6 +730,7 @@ impl FlushTask {
&self,
request_id: RequestId,
memtable_state: &MemTableState,
+ need_reorder: bool,
) -> Result<Option<FileMeta>> {
let (min_key, max_key) = match (memtable_state.mem.min_key(),
memtable_state.mem.max_key())
{
@@ -778,8 +781,24 @@ impl FlushTask {
let iter = build_mem_table_iter(memtable_state.mem.clone(),
&self.table_data)?;
- let record_batch_stream: RecordBatchStream =
- Box::new(stream::iter(iter).map_err(|e| Box::new(e) as _));
+ let record_batch_stream = if need_reorder {
+ let schema = self.table_data.schema();
+ let primary_key_indexes = schema.primary_key_indexes().to_vec();
+ let reorder = Reorder {
+ iter,
+ schema,
+ order_by_col_indexes: primary_key_indexes,
+ };
+ Box::new(
+ reorder
+ .into_stream()
+ .await
+ .context(ReorderMemIter)?
+ .map(|batch| batch.box_err()),
+ ) as _
+ } else {
+ Box::new(stream::iter(iter).map(|batch| batch.box_err())) as _
+ };
let sst_info = writer
.write(request_id, &sst_meta, record_batch_stream)
@@ -1231,6 +1250,7 @@ fn build_mem_table_iter(
need_dedup: table_data.dedup(),
reverse: false,
metrics_collector: None,
+ time_range: TimeRange::min_to_max(),
};
memtable
.scan(scan_ctx, scan_req)
diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs
index 68f79683..43515a6e 100644
--- a/analytic_engine/src/lib.rs
+++ b/analytic_engine/src/lib.rs
@@ -83,6 +83,11 @@ pub struct Config {
/// The ratio of table's write buffer size to trigger preflush, and it
/// should be in the range (0, 1].
pub preflush_write_buffer_size_ratio: f32,
+
+ /// The threshold to trigger switching mutable segment of memtable.
+ /// If it is zero, disable the layered memtable.
+ pub mutable_segment_switch_threshold: ReadableSize,
+
pub enable_primary_key_sampling: bool,
// Iterator scanning options
@@ -200,6 +205,7 @@ impl Default for Config {
remote_engine_client:
remote_engine_client::config::Config::default(),
recover_mode: RecoverMode::TableBased,
metrics: MetricsOptions::default(),
+ mutable_segment_switch_threshold: ReadableSize::mb(3),
}
}
}
diff --git a/analytic_engine/src/memtable/columnar/factory.rs
b/analytic_engine/src/memtable/columnar/factory.rs
index 1e8f0604..6ce4f996 100644
--- a/analytic_engine/src/memtable/columnar/factory.rs
+++ b/analytic_engine/src/memtable/columnar/factory.rs
@@ -24,10 +24,9 @@ use std::{
use crate::memtable::{
columnar::ColumnarMemTable,
- factory::{Factory, Options, Result},
- MemTableRef,
+ factory::{Factory, Options},
+ MemTableRef, Result,
};
-
/// Factory to create memtable
#[derive(Debug)]
pub struct ColumnarMemTableFactory;
diff --git a/analytic_engine/src/memtable/factory.rs
b/analytic_engine/src/memtable/factory.rs
index 97a7bd43..b6952e6a 100644
--- a/analytic_engine/src/memtable/factory.rs
+++ b/analytic_engine/src/memtable/factory.rs
@@ -18,15 +18,8 @@ use std::{fmt, sync::Arc};
use arena::CollectorRef;
use common_types::{schema::Schema, SequenceNumber};
-use macros::define_result;
-use snafu::Snafu;
-use crate::memtable::MemTableRef;
-
-#[derive(Debug, Snafu)]
-pub enum Error {}
-
-define_result!(Error);
+use crate::memtable::{MemTableRef, Result};
/// MemTable options
#[derive(Clone)]
diff --git a/analytic_engine/src/memtable/skiplist/factory.rs
b/analytic_engine/src/memtable/layered/factory.rs
similarity index 52%
copy from analytic_engine/src/memtable/skiplist/factory.rs
copy to analytic_engine/src/memtable/layered/factory.rs
index 1e6e8b1d..03c793b9 100644
--- a/analytic_engine/src/memtable/skiplist/factory.rs
+++ b/analytic_engine/src/memtable/layered/factory.rs
@@ -14,31 +14,38 @@
//! Skiplist memtable factory
-use std::sync::{atomic::AtomicU64, Arc};
-
-use arena::MonoIncArena;
-use skiplist::{BytewiseComparator, Skiplist};
+use std::sync::Arc;
use crate::memtable::{
- factory::{Factory, Options, Result},
- skiplist::SkiplistMemTable,
- MemTableRef,
+ factory::{Factory, FactoryRef, Options},
+ layered::LayeredMemTable,
+ MemTableRef, Result,
};
/// Factory to create memtable
#[derive(Debug)]
-pub struct SkiplistMemTableFactory;
+pub struct LayeredMemtableFactory {
+ inner_memtable_factory: FactoryRef,
+ mutable_switch_threshold: usize,
+}
+
+impl LayeredMemtableFactory {
+ pub fn new(inner_memtable_factory: FactoryRef, mutable_switch_threshold:
usize) -> Self {
+ Self {
+ inner_memtable_factory,
+ mutable_switch_threshold,
+ }
+ }
+}
-impl Factory for SkiplistMemTableFactory {
+impl Factory for LayeredMemtableFactory {
fn create_memtable(&self, opts: Options) -> Result<MemTableRef> {
- let arena = MonoIncArena::with_collector(opts.arena_block_size as
usize, opts.collector);
- let skiplist = Skiplist::with_arena(BytewiseComparator, arena);
- let memtable = Arc::new(SkiplistMemTable::new(
- opts.schema,
- skiplist,
- AtomicU64::new(opts.creation_sequence),
- ));
+ let memtable = LayeredMemTable::new(
+ &opts,
+ self.inner_memtable_factory.clone(),
+ self.mutable_switch_threshold,
+ )?;
- Ok(memtable)
+ Ok(Arc::new(memtable))
}
}
diff --git a/analytic_engine/src/memtable/layered/iter.rs
b/analytic_engine/src/memtable/layered/iter.rs
new file mode 100644
index 00000000..6e1f3030
--- /dev/null
+++ b/analytic_engine/src/memtable/layered/iter.rs
@@ -0,0 +1,120 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Skiplist memtable iterator
+
+use common_types::{record_batch::FetchedRecordBatch, schema::Schema,
time::TimeRange};
+use generic_error::BoxError;
+use snafu::ResultExt;
+
+use crate::memtable::{
+ layered::{ImmutableSegment, MutableSegment},
+ ColumnarIterPtr, Internal, ProjectSchema, Result, ScanContext, ScanRequest,
+};
+
+/// Columnar iterator for [LayeredMemTable]
+pub(crate) struct ColumnarIterImpl {
+ selected_batch_iter: ColumnarIterPtr,
+}
+
+impl ColumnarIterImpl {
+ pub fn new(
+ memtable_schema: &Schema,
+ ctx: ScanContext,
+ request: ScanRequest,
+ mutable: &MutableSegment,
+ immutables: &[ImmutableSegment],
+ ) -> Result<Self> {
+ // Create projection for the memtable schema
+ let row_projector = request
+ .row_projector_builder
+ .build(memtable_schema)
+ .context(ProjectSchema)?;
+
+ let (maybe_mutable, selected_immutables) =
+ Self::filter_by_time_range(mutable, immutables,
request.time_range);
+
+ let immutable_batches = selected_immutables
+ .flat_map(|imm| {
+ imm.record_batches().iter().map(|batch| {
+ // TODO: reduce clone here.
+ let fetched_schema =
row_projector.fetched_schema().clone();
+ let primary_key_indexes = row_projector
+ .primary_key_indexes()
+ .map(|idxs| idxs.to_vec());
+ let fetched_column_indexes =
row_projector.fetched_source_column_indexes();
+ FetchedRecordBatch::try_new(
+ fetched_schema,
+ primary_key_indexes,
+ fetched_column_indexes,
+ batch.clone(),
+ )
+ .box_err()
+ .with_context(|| Internal {
+ msg: format!("row_projector:{row_projector:?}",),
+ })
+ })
+ })
+ .collect::<Vec<_>>();
+ let immutable_iter = immutable_batches.into_iter();
+
+ let maybe_mutable_iter = match maybe_mutable {
+ Some(mutable) => Some(mutable.scan(ctx, request)?),
+ None => None,
+ };
+
+ let maybe_chained_iter = match maybe_mutable_iter {
+ Some(mutable_iter) => Box::new(mutable_iter.chain(immutable_iter))
as _,
+ None => Box::new(immutable_iter) as _,
+ };
+
+ Ok(Self {
+ selected_batch_iter: maybe_chained_iter,
+ })
+ }
+
+ fn filter_by_time_range<'a>(
+ mutable: &'a MutableSegment,
+ immutables: &'a [ImmutableSegment],
+ time_range: TimeRange,
+ ) -> (
+ Option<&'a MutableSegment>,
+ impl Iterator<Item = &'a ImmutableSegment>,
+ ) {
+ let maybe_mutable = {
+ let mutable_time_range = mutable.time_range();
+ mutable_time_range.and_then(|range| {
+ if range.intersect_with(time_range) {
+ Some(mutable)
+ } else {
+ None
+ }
+ })
+ };
+
+ let selected_immutables = immutables
+ .iter()
+ .filter(move |imm| imm.time_range().intersect_with(time_range));
+
+ (maybe_mutable, selected_immutables)
+ }
+}
+
+impl Iterator for ColumnarIterImpl {
+ type Item = Result<FetchedRecordBatch>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.selected_batch_iter.next()
+ }
+}
diff --git a/analytic_engine/src/memtable/layered/mod.rs
b/analytic_engine/src/memtable/layered/mod.rs
new file mode 100644
index 00000000..92087e1e
--- /dev/null
+++ b/analytic_engine/src/memtable/layered/mod.rs
@@ -0,0 +1,729 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! MemTable based on skiplist
+
+pub mod factory;
+pub mod iter;
+
+use std::{
+ mem,
+ ops::{Bound, Deref},
+ sync::{
+ atomic::{self, AtomicU64},
+ RwLock,
+ },
+};
+
+use arena::CollectorRef;
+use arrow::record_batch::RecordBatch as ArrowRecordBatch;
+use bytes_ext::Bytes;
+use common_types::{
+ projected_schema::RowProjectorBuilder, row::Row, schema::Schema,
time::TimeRange,
+ SequenceNumber,
+};
+use generic_error::BoxError;
+use logger::debug;
+use skiplist::{BytewiseComparator, KeyComparator};
+use snafu::{OptionExt, ResultExt};
+
+use crate::memtable::{
+ factory::{FactoryRef, Options},
+ key::KeySequence,
+ layered::iter::ColumnarIterImpl,
+ ColumnarIterPtr, Internal, InternalNoCause, MemTable, MemTableRef, Metrics
as MemtableMetrics,
+ PutContext, Result, ScanContext, ScanRequest,
+};
+
+/// MemTable implementation based on skiplist
+pub(crate) struct LayeredMemTable {
+ /// Schema of this memtable, is immutable.
+ schema: Schema,
+
+ /// The last sequence of the rows in this memtable. Update to this field
+ /// require external synchronization.
+ last_sequence: AtomicU64,
+
+ inner: RwLock<Inner>,
+
+ mutable_switch_threshold: usize,
+}
+
+impl LayeredMemTable {
+ pub fn new(
+ opts: &Options,
+ inner_memtable_factory: FactoryRef,
+ mutable_switch_threshold: usize,
+ ) -> Result<Self> {
+ let inner = Inner::new(inner_memtable_factory, opts)?;
+
+ Ok(Self {
+ schema: opts.schema.clone(),
+ last_sequence: AtomicU64::new(opts.creation_sequence),
+ inner: RwLock::new(inner),
+ mutable_switch_threshold,
+ })
+ }
+
+ // Used for testing only
+ #[cfg(test)]
+ fn force_switch_mutable_segment(&self) -> Result<()> {
+ let inner = &mut *self.inner.write().unwrap();
+ inner.switch_mutable_segment(self.schema.clone())
+ }
+}
+
+impl MemTable for LayeredMemTable {
+ fn schema(&self) -> &Schema {
+ &self.schema
+ }
+
+ fn min_key(&self) -> Option<Bytes> {
+ self.inner.read().unwrap().min_key()
+ }
+
+ fn max_key(&self) -> Option<Bytes> {
+ self.inner.read().unwrap().max_key()
+ }
+
+ fn put(
+ &self,
+ ctx: &mut PutContext,
+ sequence: KeySequence,
+ row: &Row,
+ schema: &Schema,
+ ) -> Result<()> {
+ let memory_usage = {
+ let inner = self.inner.read().unwrap();
+ inner.put(ctx, sequence, row, schema)?;
+ inner.mutable_segment.0.approximate_memory_usage()
+ };
+
+ if memory_usage > self.mutable_switch_threshold {
+ debug!(
+ "LayeredMemTable put, memory_usage:{memory_usage},
mutable_switch_threshold:{}",
+ self.mutable_switch_threshold
+ );
+ let inner = &mut *self.inner.write().unwrap();
+ inner.switch_mutable_segment(self.schema.clone())?;
+ }
+
+ Ok(())
+ }
+
+ fn scan(&self, ctx: ScanContext, request: ScanRequest) ->
Result<ColumnarIterPtr> {
+ let inner = self.inner.read().unwrap();
+ inner.scan(&self.schema, ctx, request)
+ }
+
+ fn approximate_memory_usage(&self) -> usize {
+ self.inner.read().unwrap().approximate_memory_usage()
+ }
+
+ fn set_last_sequence(&self, sequence: SequenceNumber) -> Result<()> {
+ self.last_sequence
+ .store(sequence, atomic::Ordering::Relaxed);
+ Ok(())
+ }
+
+ fn last_sequence(&self) -> SequenceNumber {
+ self.last_sequence.load(atomic::Ordering::Relaxed)
+ }
+
+ fn time_range(&self) -> Option<TimeRange> {
+ let inner = self.inner.read().unwrap();
+ inner.time_range()
+ }
+
+ fn metrics(&self) -> MemtableMetrics {
+ // FIXME: stats and return metrics
+ MemtableMetrics::default()
+ }
+}
+
+/// Layered memtable inner
+struct Inner {
+ mutable_segment_builder: MutableSegmentBuilder,
+ mutable_segment: MutableSegment,
+ immutable_segments: Vec<ImmutableSegment>,
+}
+
+impl Inner {
+ fn new(memtable_factory: FactoryRef, opts: &Options) -> Result<Self> {
+ let builder_opts = MutableBuilderOptions {
+ schema: opts.schema.clone(),
+ arena_block_size: opts.arena_block_size,
+ collector: opts.collector.clone(),
+ };
+ let mutable_segment_builder =
MutableSegmentBuilder::new(memtable_factory, builder_opts);
+
+ // Build the first mutable batch.
+ let init_mutable_segment = mutable_segment_builder.build()?;
+
+ Ok(Self {
+ mutable_segment_builder,
+ mutable_segment: init_mutable_segment,
+ immutable_segments: vec![],
+ })
+ }
+
+ /// Scan batches including `mutable` and `immutable`s.
+ #[inline]
+ fn scan(
+ &self,
+ schema: &Schema,
+ ctx: ScanContext,
+ request: ScanRequest,
+ ) -> Result<ColumnarIterPtr> {
+ let iter = ColumnarIterImpl::new(
+ schema,
+ ctx,
+ request,
+ &self.mutable_segment,
+ &self.immutable_segments,
+ )?;
+ Ok(Box::new(iter))
+ }
+
+ #[inline]
+ fn put(
+ &self,
+ ctx: &mut PutContext,
+ sequence: KeySequence,
+ row: &Row,
+ schema: &Schema,
+ ) -> Result<()> {
+ self.mutable_segment.put(ctx, sequence, row, schema)
+ }
+
+ fn switch_mutable_segment(&mut self, schema: Schema) -> Result<()> {
+ let imm_num = self.immutable_segments.len();
+ debug!("LayeredMemTable switch_mutable_segment, imm_num:{imm_num}");
+
+ // Build a new mutable segment, and replace current's.
+ let new_mutable = self.mutable_segment_builder.build()?;
+ let current_mutable = mem::replace(&mut self.mutable_segment,
new_mutable);
+ let fetched_schema = schema.to_record_schema();
+
+ // Convert current's to immutable.
+ let scan_ctx = ScanContext::default();
+ let row_projector_builder = RowProjectorBuilder::new(fetched_schema,
schema, None);
+ let scan_req = ScanRequest {
+ start_user_key: Bound::Unbounded,
+ end_user_key: Bound::Unbounded,
+ sequence: common_types::MAX_SEQUENCE_NUMBER,
+ need_dedup: false,
+ reverse: false,
+ metrics_collector: None,
+ time_range: TimeRange::min_to_max(),
+ row_projector_builder,
+ };
+
+ let immutable_batches = current_mutable
+ .scan(scan_ctx, scan_req)?
+ .map(|batch_res| batch_res.map(|batch|
batch.into_arrow_record_batch()))
+ .collect::<Result<Vec<_>>>()?;
+
+ let time_range = current_mutable.time_range().context(InternalNoCause {
+ msg: "failed to get time range from mutable segment",
+ })?;
+ let max_key = current_mutable.max_key().context(InternalNoCause {
+ msg: "failed to get max key from mutable segment",
+ })?;
+ let min_key = current_mutable.min_key().context(InternalNoCause {
+ msg: "failed to get min key from mutable segment",
+ })?;
+ let immutable = ImmutableSegment::new(immutable_batches, time_range,
min_key, max_key);
+
+ self.immutable_segments.push(immutable);
+
+ Ok(())
+ }
+
+ pub fn min_key(&self) -> Option<Bytes> {
+ let comparator = BytewiseComparator;
+
+ let mutable_min_key = self.mutable_segment.min_key();
+
+ let immutable_min_key = if self.immutable_segments.is_empty() {
+ None
+ } else {
+ let mut min_key =
self.immutable_segments.first().unwrap().min_key();
+ let mut imm_iter = self.immutable_segments.iter();
+ let _ = imm_iter.next();
+ for imm in imm_iter {
+ if let std::cmp::Ordering::Greater =
comparator.compare_key(&min_key, &imm.min_key)
+ {
+ min_key = imm.min_key();
+ }
+ }
+
+ Some(min_key)
+ };
+
+ match (mutable_min_key, immutable_min_key) {
+ (None, None) => None,
+ (None, Some(key)) | (Some(key), None) => Some(key),
+ (Some(key1), Some(key2)) => Some(match
comparator.compare_key(&key1, &key2) {
+ std::cmp::Ordering::Greater => key2,
+ std::cmp::Ordering::Less | std::cmp::Ordering::Equal => key1,
+ }),
+ }
+ }
+
+ pub fn max_key(&self) -> Option<Bytes> {
+ let comparator = BytewiseComparator;
+
+ let mutable_max_key = self.mutable_segment.max_key();
+
+ let immutable_max_key = if self.immutable_segments.is_empty() {
+ None
+ } else {
+ let mut max_key =
self.immutable_segments.first().unwrap().max_key();
+ let mut imm_iter = self.immutable_segments.iter();
+ let _ = imm_iter.next();
+ for imm in imm_iter {
+ if let std::cmp::Ordering::Less =
comparator.compare_key(&max_key, &imm.max_key) {
+ max_key = imm.max_key();
+ }
+ }
+
+ Some(max_key)
+ };
+
+ match (mutable_max_key, immutable_max_key) {
+ (None, None) => None,
+ (None, Some(key)) | (Some(key), None) => Some(key),
+ (Some(key1), Some(key2)) => Some(match
comparator.compare_key(&key1, &key2) {
+ std::cmp::Ordering::Less => key2,
+ std::cmp::Ordering::Greater | std::cmp::Ordering::Equal =>
key1,
+ }),
+ }
+ }
+
+ pub fn time_range(&self) -> Option<TimeRange> {
+ let mutable_time_range = self.mutable_segment.time_range();
+
+ let immutable_time_range = if self.immutable_segments.is_empty() {
+ None
+ } else {
+ let mut time_range =
self.immutable_segments.first().unwrap().time_range();
+ let mut imm_iter = self.immutable_segments.iter();
+ let _ = imm_iter.next();
+ for imm in imm_iter {
+ time_range = time_range.merge_range(imm.time_range());
+ }
+
+ Some(time_range)
+ };
+
+ match (mutable_time_range, immutable_time_range) {
+ (None, None) => None,
+ (None, Some(range)) | (Some(range), None) => Some(range),
+ (Some(range1), Some(range2)) => Some(range1.merge_range(range2)),
+ }
+ }
+
+ fn approximate_memory_usage(&self) -> usize {
+ let mutable_mem_usage =
self.mutable_segment.approximate_memory_usage();
+
+ let immutable_mem_usage = self
+ .immutable_segments
+ .iter()
+ .map(|imm| imm.approximate_memory_usage())
+ .sum::<usize>();
+
+ mutable_mem_usage + immutable_mem_usage
+ }
+}
+
+/// Mutable batch
+pub(crate) struct MutableSegment(MemTableRef);
+
+impl Deref for MutableSegment {
+ type Target = MemTableRef;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+/// Builder for `MutableBatch`
+struct MutableSegmentBuilder {
+ memtable_factory: FactoryRef,
+ opts: MutableBuilderOptions,
+}
+
+impl MutableSegmentBuilder {
+ fn new(memtable_factory: FactoryRef, opts: MutableBuilderOptions) -> Self {
+ Self {
+ memtable_factory,
+ opts,
+ }
+ }
+
+ fn build(&self) -> Result<MutableSegment> {
+ let memtable_opts = Options {
+ schema: self.opts.schema.clone(),
+ arena_block_size: self.opts.arena_block_size,
+ // `creation_sequence` is meaningless in inner memtable, just set
it to min.
+ creation_sequence: SequenceNumber::MIN,
+ collector: self.opts.collector.clone(),
+ };
+
+ let memtable = self
+ .memtable_factory
+ .create_memtable(memtable_opts)
+ .box_err()
+ .context(Internal {
+ msg: "failed to build mutable segment",
+ })?;
+
+ Ok(MutableSegment(memtable))
+ }
+}
+
+struct MutableBuilderOptions {
+ pub schema: Schema,
+
+ /// Block size of arena in bytes.
+ pub arena_block_size: u32,
+
+ /// Memory usage collector
+ pub collector: CollectorRef,
+}
+
+/// Immutable batch
+pub(crate) struct ImmutableSegment {
+ /// Record batch converted from `MutableBatch`
+ record_batches: Vec<ArrowRecordBatch>,
+
+ /// Min time of source `MutableBatch`
+ time_range: TimeRange,
+
+ /// Min key of source `MutableBatch`
+ min_key: Bytes,
+
+ /// Max key of source `MutableBatch`
+ max_key: Bytes,
+
+ approximate_memory_size: usize,
+}
+
+impl ImmutableSegment {
+ fn new(
+ record_batches: Vec<ArrowRecordBatch>,
+ time_range: TimeRange,
+ min_key: Bytes,
+ max_key: Bytes,
+ ) -> Self {
+ let approximate_memory_size = record_batches
+ .iter()
+ .map(|batch| batch.get_array_memory_size())
+ .sum();
+
+ Self {
+ record_batches,
+ time_range,
+ min_key,
+ max_key,
+ approximate_memory_size,
+ }
+ }
+
+ pub fn time_range(&self) -> TimeRange {
+ self.time_range
+ }
+
+ pub fn min_key(&self) -> Bytes {
+ self.min_key.clone()
+ }
+
+ pub fn max_key(&self) -> Bytes {
+ self.max_key.clone()
+ }
+
+ // TODO: maybe return a iterator?
+ pub fn record_batches(&self) -> &[ArrowRecordBatch] {
+ &self.record_batches
+ }
+
+ pub fn approximate_memory_usage(&self) -> usize {
+ self.approximate_memory_size
+ }
+}
+
+#[cfg(test)]
+mod tests {
+
+ use std::{ops::Bound, sync::Arc};
+
+ use arena::NoopCollector;
+ use bytes_ext::ByteVec;
+ use codec::{memcomparable::MemComparable, Encoder};
+ use common_types::{
+ datum::Datum,
+ projected_schema::{ProjectedSchema, RowProjectorBuilder},
+ record_batch::FetchedRecordBatch,
+ row::Row,
+ schema::IndexInWriterSchema,
+ tests::{build_row, build_schema},
+ };
+
+ use super::*;
+ use crate::memtable::{
+ factory::Options,
+ key::ComparableInternalKey,
+ skiplist::factory::SkiplistMemTableFactory,
+ test_util::{TestMemtableBuilder, TestUtil},
+ MemTableRef,
+ };
+
+ struct TestMemtableBuilderImpl;
+
+ impl TestMemtableBuilder for TestMemtableBuilderImpl {
+ fn build(&self, data: &[(KeySequence, Row)]) -> MemTableRef {
+ let schema = build_schema();
+ let factory = SkiplistMemTableFactory;
+ let opts = Options {
+ schema: schema.clone(),
+ arena_block_size: 512,
+ creation_sequence: 1,
+ collector: Arc::new(NoopCollector {}),
+ };
+ let memtable = LayeredMemTable::new(&opts, Arc::new(factory),
usize::MAX).unwrap();
+
+ let mut ctx =
+
PutContext::new(IndexInWriterSchema::for_same_schema(schema.num_columns()));
+ let partitioned_data = data.chunks(3).collect::<Vec<_>>();
+ let chunk_num = partitioned_data.len();
+
+ for chunk in partitioned_data.iter().take(chunk_num - 1) {
+ for (seq, row) in *chunk {
+ memtable.put(&mut ctx, *seq, row, &schema).unwrap();
+ }
+ memtable.force_switch_mutable_segment().unwrap();
+ }
+
+ let last_chunk = partitioned_data[chunk_num - 1];
+ for (seq, row) in last_chunk {
+ memtable.put(&mut ctx, *seq, row, &schema).unwrap();
+ }
+
+ Arc::new(memtable)
+ }
+ }
+
+ fn test_data() -> Vec<(KeySequence, Row)> {
+ vec![
+ (
+ KeySequence::new(1, 1),
+ build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
+ ),
+ (
+ KeySequence::new(1, 2),
+ build_row(b"b", 2, 10.0, "v2", 2000, 2_000_000),
+ ),
+ (
+ KeySequence::new(1, 4),
+ build_row(b"c", 3, 10.0, "v3", 3000, 3_000_000),
+ ),
+ (
+ KeySequence::new(2, 1),
+ build_row(b"d", 4, 10.0, "v4", 4000, 4_000_000),
+ ),
+ (
+ KeySequence::new(2, 1),
+ build_row(b"e", 5, 10.0, "v5", 5000, 5_000_000),
+ ),
+ (
+ KeySequence::new(2, 3),
+ build_row(b"f", 6, 10.0, "v6", 6000, 6_000_000),
+ ),
+ (
+ KeySequence::new(3, 4),
+ build_row(b"g", 7, 10.0, "v7", 7000, 7_000_000),
+ ),
+ ]
+ }
+
+ #[test]
+ fn test_memtable_scan() {
+ let builder = TestMemtableBuilderImpl;
+ let data = test_data();
+ let test_util = TestUtil::new(builder, data);
+ let memtable = test_util.memtable();
+ let schema = memtable.schema().clone();
+
+ // No projection.
+ let projection = (0..schema.num_columns()).collect::<Vec<_>>();
+ let expected = test_util.data();
+ test_memtable_scan_internal(
+ schema.clone(),
+ projection,
+ TimeRange::min_to_max(),
+ memtable.clone(),
+ expected,
+ );
+
+ // Projection to first three.
+ let projection = vec![0, 1, 3];
+ let expected = test_util
+ .data()
+ .iter()
+ .map(|row| {
+ let datums = vec![row[0].clone(), row[1].clone(),
row[3].clone()];
+ Row::from_datums(datums)
+ })
+ .collect();
+ test_memtable_scan_internal(
+ schema.clone(),
+ projection,
+ TimeRange::min_to_max(),
+ memtable.clone(),
+ expected,
+ );
+
+ // No projection.
+ let projection = (0..schema.num_columns()).collect::<Vec<_>>();
+ let time_range = TimeRange::new(2.into(), 7.into()).unwrap();
+ // Memtable data after switching may be like(just showing timestamp
column using
+ // to filter): [1, 2, 3], [4, 5, 6], [7]
+ //
+ // And the target time range is: [2, 7)
+ //
+ // So the filter result should be: [1, 2, 3], [4, 5, 6]
+ let expected = test_util
+ .data()
+ .iter()
+ .enumerate()
+ .filter_map(|(idx, row)| if idx < 6 { Some(row.clone()) } else {
None })
+ .collect();
+ test_memtable_scan_internal(
+ schema.clone(),
+ projection,
+ time_range,
+ memtable.clone(),
+ expected,
+ );
+ }
+
+ #[test]
+ fn test_time_range() {
+ let builder = TestMemtableBuilderImpl;
+ let data = test_data();
+ let test_util = TestUtil::new(builder, data);
+ let memtable = test_util.memtable();
+
+ assert_eq!(TimeRange::new(1.into(), 8.into()), memtable.time_range());
+ }
+
+ #[test]
+ fn test_min_max_key() {
+ let builder = TestMemtableBuilderImpl;
+ let data = test_data();
+ let test_util = TestUtil::new(builder, data.clone());
+ let memtable = test_util.memtable();
+ let schema = memtable.schema();
+
+ // Min key
+ let key_encoder = ComparableInternalKey::new(data[0].0, schema);
+ let mut min_key = Vec::new();
+ min_key.reserve(key_encoder.estimate_encoded_size(&data[0].1));
+ key_encoder.encode(&mut min_key, &data[0].1).unwrap();
+ let key_encoder = ComparableInternalKey::new(data[0].0, schema);
+ let mut min_key = Vec::new();
+ min_key.reserve(key_encoder.estimate_encoded_size(&data[0].1));
+ key_encoder.encode(&mut min_key, &data[0].1).unwrap();
+
+ // Max key
+ let key_encoder = ComparableInternalKey::new(data[6].0, schema);
+ let mut max_key = Vec::new();
+ max_key.reserve(key_encoder.estimate_encoded_size(&data[6].1));
+ key_encoder.encode(&mut max_key, &data[6].1).unwrap();
+ let key_encoder = ComparableInternalKey::new(data[6].0, schema);
+ let mut max_key = Vec::new();
+ max_key.reserve(key_encoder.estimate_encoded_size(&data[6].1));
+ key_encoder.encode(&mut max_key, &data[6].1).unwrap();
+
+ assert_eq!(min_key, memtable.min_key().unwrap().to_vec());
+ assert_eq!(max_key, memtable.max_key().unwrap().to_vec());
+ }
+
+ fn test_memtable_scan_internal(
+ schema: Schema,
+ projection: Vec<usize>,
+ time_range: TimeRange,
+ memtable: Arc<dyn MemTable + Send + Sync>,
+ expected: Vec<Row>,
+ ) {
+ let projected_schema = ProjectedSchema::new(schema,
Some(projection)).unwrap();
+ let fetched_schema = projected_schema.to_record_schema();
+ let table_schema = projected_schema.table_schema();
+ let row_projector_builder =
+ RowProjectorBuilder::new(fetched_schema, table_schema.clone(),
None);
+
+ // limited by sequence
+ let scan_request = ScanRequest {
+ start_user_key: Bound::Unbounded,
+ end_user_key: Bound::Unbounded,
+ sequence: SequenceNumber::MAX,
+ row_projector_builder,
+ need_dedup: false,
+ reverse: false,
+ metrics_collector: None,
+ time_range,
+ };
+ let scan_ctx = ScanContext::default();
+ let iter = memtable.scan(scan_ctx, scan_request).unwrap();
+ check_iterator(iter, expected);
+ }
+
+ fn check_iterator<T: Iterator<Item = Result<FetchedRecordBatch>>>(
+ iter: T,
+ expected_rows: Vec<Row>,
+ ) {
+ // sort it first.
+ let mut rows = Vec::new();
+ for batch in iter {
+ let batch = batch.unwrap();
+ for row_idx in 0..batch.num_rows() {
+ rows.push(batch.clone_row_at(row_idx));
+ }
+ }
+
+ rows.sort_by(|a, b| {
+ let key1 = build_scan_key(
+ &String::from_utf8_lossy(a[0].as_varbinary().unwrap()),
+ a[1].as_timestamp().unwrap().as_i64(),
+ );
+ let key2 = build_scan_key(
+ &String::from_utf8_lossy(b[0].as_varbinary().unwrap()),
+ b[1].as_timestamp().unwrap().as_i64(),
+ );
+ BytewiseComparator.compare_key(&key1, &key2)
+ });
+
+ assert_eq!(rows, expected_rows);
+ }
+
+ fn build_scan_key(c1: &str, c2: i64) -> Bytes {
+ let mut buf = ByteVec::new();
+ let encoder = MemComparable;
+ encoder.encode(&mut buf, &Datum::from(c1)).unwrap();
+ encoder.encode(&mut buf, &Datum::from(c2)).unwrap();
+
+ Bytes::from(buf)
+ }
+}
diff --git a/analytic_engine/src/memtable/mod.rs
b/analytic_engine/src/memtable/mod.rs
index ed3b20d3..906936bb 100644
--- a/analytic_engine/src/memtable/mod.rs
+++ b/analytic_engine/src/memtable/mod.rs
@@ -17,24 +17,28 @@
pub mod columnar;
pub mod factory;
pub mod key;
+pub mod layered;
mod reversed_iter;
pub mod skiplist;
+pub mod test_util;
-use std::{ops::Bound, sync::Arc, time::Instant};
+use std::{collections::HashMap, ops::Bound, sync::Arc, time::Instant};
use bytes_ext::{ByteVec, Bytes};
+use ceresdbproto::manifest;
use common_types::{
projected_schema::RowProjectorBuilder,
record_batch::FetchedRecordBatch,
row::Row,
schema::{IndexInWriterSchema, Schema},
time::TimeRange,
- SequenceNumber,
+ SequenceNumber, MUTABLE_SEGMENT_SWITCH_THRESHOLD,
};
-use generic_error::GenericError;
+use generic_error::{BoxError, GenericError};
use macros::define_result;
use serde::{Deserialize, Serialize};
-use snafu::{Backtrace, Snafu};
+use size_ext::ReadableSize;
+use snafu::{Backtrace, ResultExt, Snafu};
use trace_metric::MetricsCollector;
use crate::memtable::key::KeySequence;
@@ -46,13 +50,13 @@ const MEMTABLE_TYPE_COLUMNAR: &str = "columnar";
#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)]
pub enum MemtableType {
SkipList,
- Columnar,
+ Column,
}
impl MemtableType {
pub fn parse_from(s: &str) -> Self {
if s.eq_ignore_ascii_case(MEMTABLE_TYPE_COLUMNAR) {
- MemtableType::Columnar
+ MemtableType::Column
} else {
MemtableType::SkipList
}
@@ -63,7 +67,54 @@ impl ToString for MemtableType {
fn to_string(&self) -> String {
match self {
MemtableType::SkipList => MEMTABLE_TYPE_SKIPLIST.to_string(),
- MemtableType::Columnar => MEMTABLE_TYPE_COLUMNAR.to_string(),
+ MemtableType::Column => MEMTABLE_TYPE_COLUMNAR.to_string(),
+ }
+ }
+}
+
+/// Layered memtable options
+/// If `mutable_segment_switch_threshold` is set zero, layered memtable will be
+/// disable.
+#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
+#[serde(default)]
+pub struct LayeredMemtableOptions {
+ pub mutable_segment_switch_threshold: ReadableSize,
+}
+
+impl Default for LayeredMemtableOptions {
+ fn default() -> Self {
+ Self {
+ mutable_segment_switch_threshold: ReadableSize::mb(3),
+ }
+ }
+}
+
+impl LayeredMemtableOptions {
+ pub fn parse_from(opts: &HashMap<String, String>) -> Result<Self> {
+ let mut options = LayeredMemtableOptions::default();
+ if let Some(v) = opts.get(MUTABLE_SEGMENT_SWITCH_THRESHOLD) {
+ let threshold = v.parse::<u64>().box_err().context(Internal {
+ msg: format!("invalid mutable segment switch threshold:{v}"),
+ })?;
+ options.mutable_segment_switch_threshold = ReadableSize(threshold);
+ }
+
+ Ok(options)
+ }
+}
+
+impl From<manifest::LayeredMemtableOptions> for LayeredMemtableOptions {
+ fn from(value: manifest::LayeredMemtableOptions) -> Self {
+ Self {
+ mutable_segment_switch_threshold:
ReadableSize(value.mutable_segment_switch_threshold),
+ }
+ }
+}
+
+impl From<LayeredMemtableOptions> for manifest::LayeredMemtableOptions {
+ fn from(value: LayeredMemtableOptions) -> Self {
+ Self {
+ mutable_segment_switch_threshold:
value.mutable_segment_switch_threshold.0,
}
}
}
@@ -147,6 +198,11 @@ pub enum Error {
max: usize,
backtrace: Backtrace,
},
+ #[snafu(display("Factory err, msg:{msg}, err:{source}"))]
+ Factory { msg: String, source: GenericError },
+
+ #[snafu(display("Factory err, msg:{msg}.\nBacktrace:\n{backtrace}"))]
+ FactoryNoCause { msg: String, backtrace: Backtrace },
}
pub const TOO_LARGE_MESSAGE: &str = "Memtable key length is too large";
@@ -208,6 +264,7 @@ pub struct ScanRequest {
pub reverse: bool,
/// Collector for scan metrics.
pub metrics_collector: Option<MetricsCollector>,
+ pub time_range: TimeRange,
}
/// In memory storage for table's data.
@@ -277,7 +334,7 @@ pub trait MemTable {
fn metrics(&self) -> Metrics;
}
-#[derive(Debug)]
+#[derive(Debug, Default)]
pub struct Metrics {
/// Size of original rows.
pub row_raw_size: usize,
diff --git a/analytic_engine/src/memtable/skiplist/factory.rs
b/analytic_engine/src/memtable/skiplist/factory.rs
index 1e6e8b1d..3f90bcde 100644
--- a/analytic_engine/src/memtable/skiplist/factory.rs
+++ b/analytic_engine/src/memtable/skiplist/factory.rs
@@ -20,9 +20,9 @@ use arena::MonoIncArena;
use skiplist::{BytewiseComparator, Skiplist};
use crate::memtable::{
- factory::{Factory, Options, Result},
+ factory::{Factory, Options},
skiplist::SkiplistMemTable,
- MemTableRef,
+ MemTableRef, Result,
};
/// Factory to create memtable
diff --git a/analytic_engine/src/memtable/skiplist/mod.rs
b/analytic_engine/src/memtable/skiplist/mod.rs
index a71a82a6..c0a47cf3 100644
--- a/analytic_engine/src/memtable/skiplist/mod.rs
+++ b/analytic_engine/src/memtable/skiplist/mod.rs
@@ -286,8 +286,35 @@ mod tests {
use crate::memtable::{
factory::{Factory, Options},
skiplist::factory::SkiplistMemTableFactory,
+ test_util::{TestMemtableBuilder, TestUtil},
+ MemTableRef,
};
+ struct TestMemtableBuilderImpl;
+
+ impl TestMemtableBuilder for TestMemtableBuilderImpl {
+ fn build(&self, data: &[(KeySequence, Row)]) -> MemTableRef {
+ let schema = build_schema();
+ let factory = SkiplistMemTableFactory;
+ let memtable = factory
+ .create_memtable(Options {
+ schema: schema.clone(),
+ arena_block_size: 512,
+ creation_sequence: 1,
+ collector: Arc::new(NoopCollector {}),
+ })
+ .unwrap();
+
+ let mut ctx =
+
PutContext::new(IndexInWriterSchema::for_same_schema(schema.num_columns()));
+ for (seq, row) in data {
+ memtable.put(&mut ctx, *seq, row, &schema).unwrap();
+ }
+
+ memtable
+ }
+ }
+
fn test_memtable_scan_for_scan_request(
schema: Schema,
memtable: Arc<dyn MemTable + Send + Sync>,
@@ -309,6 +336,7 @@ mod tests {
need_dedup: true,
reverse: false,
metrics_collector: None,
+ time_range: TimeRange::min_to_max(),
},
vec![
build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
@@ -329,6 +357,7 @@ mod tests {
need_dedup: true,
reverse: false,
metrics_collector: None,
+ time_range: TimeRange::min_to_max(),
},
vec![
build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
@@ -348,6 +377,7 @@ mod tests {
need_dedup: true,
reverse: false,
metrics_collector: None,
+ time_range: TimeRange::min_to_max(),
},
vec![
build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
@@ -383,6 +413,7 @@ mod tests {
need_dedup: true,
reverse: false,
metrics_collector: None,
+ time_range: TimeRange::min_to_max(),
},
vec![
build_row_for_two_column(b"a", 1),
@@ -401,19 +432,52 @@ mod tests {
#[test]
fn test_memtable_scan() {
- let schema = build_schema();
- let factory = SkiplistMemTableFactory;
- let memtable = factory
- .create_memtable(Options {
- schema: schema.clone(),
- arena_block_size: 512,
- creation_sequence: 1,
- collector: Arc::new(NoopCollector {}),
- })
- .unwrap();
-
- let mut ctx =
PutContext::new(IndexInWriterSchema::for_same_schema(schema.num_columns()));
- let input = vec![
+ let data = test_data();
+ let builder = TestMemtableBuilderImpl;
+ let test_util = TestUtil::new(builder, data);
+ let memtable = test_util.memtable();
+ let schema = memtable.schema().clone();
+
+ test_memtable_scan_for_scan_request(schema.clone(), memtable.clone());
+ test_memtable_scan_for_projection(schema, memtable);
+ }
+
+ fn check_iterator<T: Iterator<Item = Result<FetchedRecordBatch>>>(
+ iter: T,
+ expected_rows: Vec<Row>,
+ ) {
+ let mut visited_rows = 0;
+ for batch in iter {
+ let batch = batch.unwrap();
+ for row_idx in 0..batch.num_rows() {
+ assert_eq!(batch.clone_row_at(row_idx),
expected_rows[visited_rows]);
+ visited_rows += 1;
+ }
+ }
+
+ assert_eq!(visited_rows, expected_rows.len());
+ }
+
+ fn build_scan_key(c1: &str, c2: i64) -> Bytes {
+ let mut buf = ByteVec::new();
+ let encoder = MemComparable;
+ encoder.encode(&mut buf, &Datum::from(c1)).unwrap();
+ encoder.encode(&mut buf, &Datum::from(c2)).unwrap();
+
+ Bytes::from(buf)
+ }
+
+ pub fn build_row_for_two_column(key1: &[u8], key2: i64) -> Row {
+ let datums = vec![
+ Datum::Varbinary(Bytes::copy_from_slice(key1)),
+ Datum::Timestamp(Timestamp::new(key2)),
+ ];
+
+ Row::from_datums(datums)
+ }
+
+ fn test_data() -> Vec<(KeySequence, Row)> {
+ vec![
(
KeySequence::new(1, 1),
build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
@@ -453,47 +517,6 @@ mod tests {
KeySequence::new(3, 4),
build_row(b"g", 7, 10.0, "v7", 7000, 7_000_000),
),
- ];
-
- for (seq, row) in input {
- memtable.put(&mut ctx, seq, &row, &schema).unwrap();
- }
-
- test_memtable_scan_for_scan_request(schema.clone(), memtable.clone());
- test_memtable_scan_for_projection(schema, memtable);
- }
-
- fn check_iterator<T: Iterator<Item = Result<FetchedRecordBatch>>>(
- iter: T,
- expected_rows: Vec<Row>,
- ) {
- let mut visited_rows = 0;
- for batch in iter {
- let batch = batch.unwrap();
- for row_idx in 0..batch.num_rows() {
- assert_eq!(batch.clone_row_at(row_idx),
expected_rows[visited_rows]);
- visited_rows += 1;
- }
- }
-
- assert_eq!(visited_rows, expected_rows.len());
- }
-
- fn build_scan_key(c1: &str, c2: i64) -> Bytes {
- let mut buf = ByteVec::new();
- let encoder = MemComparable;
- encoder.encode(&mut buf, &Datum::from(c1)).unwrap();
- encoder.encode(&mut buf, &Datum::from(c2)).unwrap();
-
- Bytes::from(buf)
- }
-
- pub fn build_row_for_two_column(key1: &[u8], key2: i64) -> Row {
- let datums = vec![
- Datum::Varbinary(Bytes::copy_from_slice(key1)),
- Datum::Timestamp(Timestamp::new(key2)),
- ];
-
- Row::from_datums(datums)
+ ]
}
}
diff --git a/analytic_engine/src/memtable/test_util.rs
b/analytic_engine/src/memtable/test_util.rs
new file mode 100644
index 00000000..72364b18
--- /dev/null
+++ b/analytic_engine/src/memtable/test_util.rs
@@ -0,0 +1,42 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use common_types::row::Row;
+
+use crate::memtable::*;
+
+pub trait TestMemtableBuilder {
+ fn build(&self, data: &[(KeySequence, Row)]) -> MemTableRef;
+}
+
+pub struct TestUtil {
+ memtable: MemTableRef,
+ data: Vec<(KeySequence, Row)>,
+}
+
+impl TestUtil {
+ pub fn new<B: TestMemtableBuilder>(builder: B, data: Vec<(KeySequence,
Row)>) -> Self {
+ let memtable = builder.build(&data);
+
+ Self { memtable, data }
+ }
+
+ pub fn memtable(&self) -> MemTableRef {
+ self.memtable.clone()
+ }
+
+ pub fn data(&self) -> Vec<Row> {
+ self.data.iter().map(|d| d.1.clone()).collect()
+ }
+}
diff --git a/analytic_engine/src/row_iter/record_batch_stream.rs
b/analytic_engine/src/row_iter/record_batch_stream.rs
index dd0f4d13..15ecc8e3 100644
--- a/analytic_engine/src/row_iter/record_batch_stream.rs
+++ b/analytic_engine/src/row_iter/record_batch_stream.rs
@@ -256,6 +256,7 @@ pub fn stream_from_memtable(
need_dedup: ctx.need_dedup,
reverse: ctx.reverse,
metrics_collector,
+ time_range: ctx.predicate.time_range(),
};
let iter = memtable.scan(scan_ctx, scan_req).context(ScanMemtable)?;
diff --git a/analytic_engine/src/sst/parquet/async_reader.rs
b/analytic_engine/src/sst/parquet/async_reader.rs
index 0b7ffdc2..305bc937 100644
--- a/analytic_engine/src/sst/parquet/async_reader.rs
+++ b/analytic_engine/src/sst/parquet/async_reader.rs
@@ -560,10 +560,21 @@ impl Stream for RecordBatchProjector {
}
projector.metrics.row_num += record_batch.num_rows();
- let projected_batch =
-
FetchedRecordBatch::try_new(&projector.row_projector, record_batch)
- .box_err()
- .context(DecodeRecordBatch {});
+ let fetched_schema =
projector.row_projector.fetched_schema().clone();
+ let primary_key_indexes = projector
+ .row_projector
+ .primary_key_indexes()
+ .map(|idxs| idxs.to_vec());
+ let fetching_column_indexes =
+
projector.row_projector.target_record_projection_remapping();
+ let projected_batch = FetchedRecordBatch::try_new(
+ fetched_schema,
+ primary_key_indexes,
+ fetching_column_indexes,
+ record_batch,
+ )
+ .box_err()
+ .context(DecodeRecordBatch {});
Poll::Ready(Some(projected_batch))
}
diff --git a/analytic_engine/src/table/data.rs
b/analytic_engine/src/table/data.rs
index 998c5c3b..4d26448f 100644
--- a/analytic_engine/src/table/data.rs
+++ b/analytic_engine/src/table/data.rs
@@ -54,6 +54,7 @@ use crate::{
memtable::{
columnar::factory::ColumnarMemTableFactory,
factory::{FactoryRef as MemTableFactoryRef, Options as
MemTableOptions},
+ layered::factory::LayeredMemtableFactory,
skiplist::factory::SkiplistMemTableFactory,
MemtableType,
},
@@ -70,9 +71,7 @@ use crate::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to create memtable, err:{}", source))]
- CreateMemTable {
- source: crate::memtable::factory::Error,
- },
+ CreateMemTable { source: crate::memtable::Error },
#[snafu(display(
"Failed to find or create memtable, timestamp overflow,
timestamp:{:?}, duration:{:?}.\nBacktrace:\n{}",
@@ -232,6 +231,9 @@ pub struct TableData {
/// Whether enable primary key sampling
enable_primary_key_sampling: bool,
+ /// Whether enable layered memtable
+ pub enable_layered_memtable: bool,
+
/// Metrics of this table
pub metrics: Metrics,
@@ -315,7 +317,22 @@ impl TableData {
let memtable_factory: MemTableFactoryRef = match opts.memtable_type {
MemtableType::SkipList => Arc::new(SkiplistMemTableFactory),
- MemtableType::Columnar => Arc::new(ColumnarMemTableFactory),
+ MemtableType::Column => Arc::new(ColumnarMemTableFactory),
+ };
+
+ // Wrap it by `LayeredMemtable`.
+ let mutable_segment_switch_threshold = opts
+ .layered_memtable_opts
+ .mutable_segment_switch_threshold
+ .0 as usize;
+ let enable_layered_memtable = mutable_segment_switch_threshold > 0;
+ let memtable_factory = if enable_layered_memtable {
+ Arc::new(LayeredMemtableFactory::new(
+ memtable_factory,
+ mutable_segment_switch_threshold,
+ ))
+ } else {
+ memtable_factory
};
let purge_queue = purger.create_purge_queue(space_id, id);
@@ -355,6 +372,7 @@ impl TableData {
manifest_updates: AtomicUsize::new(0),
manifest_snapshot_every_n_updates,
enable_primary_key_sampling,
+ enable_layered_memtable,
})
}
@@ -376,7 +394,27 @@ impl TableData {
metrics_opt,
enable_primary_key_sampling,
} = config;
- let memtable_factory = Arc::new(SkiplistMemTableFactory);
+
+ let memtable_factory: MemTableFactoryRef = match
add_meta.opts.memtable_type {
+ MemtableType::SkipList => Arc::new(SkiplistMemTableFactory),
+ MemtableType::Column => Arc::new(ColumnarMemTableFactory),
+ };
+ // Maybe wrap it by `LayeredMemtable`.
+ let mutable_segment_switch_threshold = add_meta
+ .opts
+ .layered_memtable_opts
+ .mutable_segment_switch_threshold
+ .0 as usize;
+ let enable_layered_memtable = mutable_segment_switch_threshold > 0;
+ let memtable_factory = if enable_layered_memtable {
+ Arc::new(LayeredMemtableFactory::new(
+ memtable_factory,
+ mutable_segment_switch_threshold,
+ )) as _
+ } else {
+ memtable_factory as _
+ };
+
let purge_queue = purger.create_purge_queue(add_meta.space_id,
add_meta.table_id);
let current_version =
TableVersion::new(mem_size_options.size_sampling_interval,
purge_queue);
@@ -410,6 +448,7 @@ impl TableData {
manifest_updates: AtomicUsize::new(0),
manifest_snapshot_every_n_updates,
enable_primary_key_sampling,
+ enable_layered_memtable,
})
}
diff --git a/analytic_engine/src/table_meta_set_impl.rs
b/analytic_engine/src/table_meta_set_impl.rs
index 00ead80a..fa69642b 100644
--- a/analytic_engine/src/table_meta_set_impl.rs
+++ b/analytic_engine/src/table_meta_set_impl.rs
@@ -134,6 +134,7 @@ impl TableMetaSetImpl {
collector: space.mem_usage_collector.clone(),
size_sampling_interval: space.mem_usage_sampling_interval,
};
+
let table_data = Arc::new(
TableData::new(
TableDesc {
diff --git a/analytic_engine/src/table_options.rs
b/analytic_engine/src/table_options.rs
index 35b0afc6..c33a75db 100644
--- a/analytic_engine/src/table_options.rs
+++ b/analytic_engine/src/table_options.rs
@@ -33,7 +33,7 @@ use crate::{
compaction::{
self, CompactionStrategy, SizeTieredCompactionOptions,
TimeWindowCompactionOptions,
},
- memtable::MemtableType,
+ memtable::{LayeredMemtableOptions, MemtableType},
};
const UPDATE_MODE_OVERWRITE: &str = "OVERWRITE";
@@ -98,6 +98,7 @@ pub enum Error {
backtrace
))]
ParseUpdateMode { s: String, backtrace: Backtrace },
+
#[snafu(display(
"Failed to parse compression, name:{}.\nBacktrace:\n{}",
name,
@@ -134,6 +135,17 @@ pub enum Error {
backtrace
))]
HybridDeprecated { backtrace: Backtrace },
+
+ #[snafu(display(
+ "Failed to parse layered memtable options,
err:{source}.\nBacktrace:\n{backtrace}",
+ ))]
+ ParseLayeredMemtableOptions {
+ source: crate::memtable::Error,
+ backtrace: Backtrace,
+ },
+
+ #[snafu(display("Layered memtable options is
missing.\nBacktrace:\n{backtrace}",))]
+ MissingLayeredMemtableOptions { backtrace: Backtrace },
}
define_result!(Error);
@@ -396,8 +408,11 @@ pub struct TableOptions {
pub num_rows_per_row_group: usize,
/// Table Compression
pub compression: Compression,
+
/// Memtable type
pub memtable_type: MemtableType,
+ /// Layered memtable options
+ pub layered_memtable_opts: LayeredMemtableOptions,
}
impl TableOptions {
@@ -583,6 +598,8 @@ impl From<TableOptions> for manifest_pb::TableOptions {
),
};
+ let layered_memtable_opts = opts.layered_memtable_opts.into();
+
manifest_pb::TableOptions {
segment_duration,
enable_ttl: opts.enable_ttl,
@@ -598,6 +615,7 @@ impl From<TableOptions> for manifest_pb::TableOptions {
storage_format_hint: Some(manifest_pb::StorageFormatHint::from(
opts.storage_format_hint,
)),
+ layered_memtable_options: Some(layered_memtable_opts),
// TODO: persist `memtable_type` in PB.
}
}
@@ -659,6 +677,11 @@ impl TryFrom<manifest_pb::TableOptions> for TableOptions {
};
let storage_format_hint =
opts.storage_format_hint.context(MissingStorageFormatHint)?;
+ let layered_memtable_opts = opts
+ .layered_memtable_options
+ .context(MissingLayeredMemtableOptions)?
+ .into();
+
let table_opts = Self {
segment_duration,
enable_ttl: opts.enable_ttl,
@@ -671,6 +694,7 @@ impl TryFrom<manifest_pb::TableOptions> for TableOptions {
compression: Compression::from(compression),
storage_format_hint:
StorageFormatHint::try_from(storage_format_hint)?,
memtable_type: MemtableType::SkipList,
+ layered_memtable_opts,
};
Ok(table_opts)
@@ -691,6 +715,7 @@ impl Default for TableOptions {
compression: Compression::Zstd,
storage_format_hint: StorageFormatHint::default(),
memtable_type: MemtableType::SkipList,
+ layered_memtable_opts: LayeredMemtableOptions::default(),
}
}
}
@@ -712,54 +737,59 @@ pub fn merge_table_options_for_alter(
/// The options will override the old options.
fn merge_table_options(
options: &HashMap<String, String>,
- table_old_opts: &TableOptions,
+ base_table_opts: &TableOptions,
is_create: bool,
) -> Result<TableOptions> {
- let mut table_opts = table_old_opts.clone();
+ let mut base_table_opts = base_table_opts.clone();
if is_create {
if let Some(v) = options.get(SEGMENT_DURATION) {
if v.is_empty() {
- table_opts.segment_duration = None;
+ base_table_opts.segment_duration = None;
} else {
- table_opts.segment_duration =
Some(parse_duration(v).context(ParseDuration)?);
+ base_table_opts.segment_duration =
Some(parse_duration(v).context(ParseDuration)?);
}
}
if let Some(v) = options.get(UPDATE_MODE) {
- table_opts.update_mode = UpdateMode::parse_from(v)?;
+ base_table_opts.update_mode = UpdateMode::parse_from(v)?;
}
}
if let Some(v) = options.get(TTL) {
- table_opts.ttl = parse_duration(v).context(ParseDuration)?;
+ base_table_opts.ttl = parse_duration(v).context(ParseDuration)?;
}
if let Some(v) = options.get(OPTION_KEY_ENABLE_TTL) {
- table_opts.enable_ttl = v.parse::<bool>().context(ParseBool)?;
+ base_table_opts.enable_ttl = v.parse::<bool>().context(ParseBool)?;
}
if let Some(v) = options.get(ARENA_BLOCK_SIZE) {
let size = parse_size(v)?;
- table_opts.arena_block_size = size.0 as u32;
+ base_table_opts.arena_block_size = size.0 as u32;
}
if let Some(v) = options.get(WRITE_BUFFER_SIZE) {
let size = parse_size(v)?;
- table_opts.write_buffer_size = size.0 as u32;
+ base_table_opts.write_buffer_size = size.0 as u32;
}
if let Some(v) = options.get(COMPACTION_STRATEGY) {
- table_opts.compaction_strategy =
+ base_table_opts.compaction_strategy =
CompactionStrategy::parse_from(v, options).context(ParseStrategy {
value: v })?;
}
if let Some(v) = options.get(NUM_ROWS_PER_ROW_GROUP) {
- table_opts.num_rows_per_row_group = v.parse().context(ParseInt)?;
+ base_table_opts.num_rows_per_row_group = v.parse().context(ParseInt)?;
}
if let Some(v) = options.get(COMPRESSION) {
- table_opts.compression = Compression::parse_from(v)?;
+ base_table_opts.compression = Compression::parse_from(v)?;
}
if let Some(v) = options.get(STORAGE_FORMAT) {
- table_opts.storage_format_hint = v.as_str().try_into()?;
+ base_table_opts.storage_format_hint = v.as_str().try_into()?;
}
if let Some(v) = options.get(MEMTABLE_TYPE) {
- table_opts.memtable_type = MemtableType::parse_from(v);
+ base_table_opts.memtable_type = MemtableType::parse_from(v);
}
- Ok(table_opts)
+
+ let layered_memtable_opts =
+
LayeredMemtableOptions::parse_from(options).context(ParseLayeredMemtableOptions)?;
+ base_table_opts.layered_memtable_opts = layered_memtable_opts;
+
+ Ok(base_table_opts)
}
fn parse_size(v: &str) -> Result<ReadableSize> {
diff --git a/benchmarks/src/scan_memtable_bench.rs
b/benchmarks/src/scan_memtable_bench.rs
index 72e09a05..79c9378c 100644
--- a/benchmarks/src/scan_memtable_bench.rs
+++ b/benchmarks/src/scan_memtable_bench.rs
@@ -25,7 +25,10 @@ use analytic_engine::{
sst::meta_data::cache::MetaCacheRef,
};
use arena::NoopCollector;
-use common_types::projected_schema::{ProjectedSchema, RowProjectorBuilder};
+use common_types::{
+ projected_schema::{ProjectedSchema, RowProjectorBuilder},
+ time::TimeRange,
+};
use logger::info;
use object_store::{LocalFileSystem, Path};
@@ -103,6 +106,7 @@ impl ScanMemTableBench {
reverse: false,
metrics_collector: None,
row_projector_builder,
+ time_range: TimeRange::min_to_max(),
};
let iter = self.memtable.scan(scan_ctx, scan_req).unwrap();
diff --git a/common_types/src/lib.rs b/common_types/src/lib.rs
index 78995e12..f54e92ae 100644
--- a/common_types/src/lib.rs
+++ b/common_types/src/lib.rs
@@ -50,6 +50,7 @@ pub const UPDATE_MODE: &str = "update_mode";
pub const COMPRESSION: &str = "compression";
pub const STORAGE_FORMAT: &str = "storage_format";
pub const MEMTABLE_TYPE: &str = "memtable_type";
+pub const MUTABLE_SEGMENT_SWITCH_THRESHOLD: &str =
"mutable_segment_switch_threshold";
#[cfg(any(test, feature = "test"))]
pub mod tests;
diff --git a/common_types/src/projected_schema.rs
b/common_types/src/projected_schema.rs
index d0f780d8..1ef458e9 100644
--- a/common_types/src/projected_schema.rs
+++ b/common_types/src/projected_schema.rs
@@ -224,7 +224,7 @@ impl RowProjector {
/// The projected indexes of all columns(existed and not exist) in the
/// projected source schema.
- pub fn fetched_projected_source_column_indexes(&self) -> &[Option<usize>] {
+ pub fn target_record_projection_remapping(&self) -> &[Option<usize>] {
&self.target_record_projection_remapping
}
diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs
index 1b7d610d..d4a3b24a 100644
--- a/common_types/src/record_batch.rs
+++ b/common_types/src/record_batch.rs
@@ -29,7 +29,7 @@ use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use crate::{
column_block::{cast_nanosecond_to_mills, ColumnBlock, ColumnBlockBuilder},
datum::DatumKind,
- projected_schema::{ProjectedSchema, RowProjector},
+ projected_schema::ProjectedSchema,
row::{
contiguous::{ContiguousRow, ProjectedContiguousRow},
Row, RowViewOnBatch,
@@ -279,6 +279,11 @@ impl RecordBatch {
pub fn into_arrow_record_batch(self) -> ArrowRecordBatch {
self.data.arrow_record_batch
}
+
+ #[inline]
+ pub fn into_record_batch_data(self) -> RecordBatchData {
+ self.data
+ }
}
impl TryFrom<ArrowRecordBatch> for RecordBatch {
@@ -371,14 +376,16 @@ pub struct FetchedRecordBatch {
}
impl FetchedRecordBatch {
- pub fn try_new(ctx: &RowProjector, arrow_record_batch: ArrowRecordBatch)
-> Result<Self> {
- let column_indexes = ctx.fetched_projected_source_column_indexes();
- let schema = ctx.fetched_schema().clone();
- let mut column_blocks = Vec::with_capacity(schema.num_columns());
-
+ pub fn try_new(
+ fetched_schema: RecordSchema,
+ primary_key_indexes: Option<Vec<usize>>,
+ column_indexes: &[Option<usize>],
+ arrow_record_batch: ArrowRecordBatch,
+ ) -> Result<Self> {
+ let mut column_blocks =
Vec::with_capacity(fetched_schema.num_columns());
let num_rows = arrow_record_batch.num_rows();
let num_columns = arrow_record_batch.num_columns();
- for (col_idx_opt, col_schema) in
column_indexes.iter().zip(schema.columns()) {
+ for (col_idx_opt, col_schema) in
column_indexes.iter().zip(fetched_schema.columns()) {
match col_idx_opt {
Some(col_idx) => {
ensure!(
@@ -409,11 +416,11 @@ impl FetchedRecordBatch {
}
}
- let data = RecordBatchData::new(schema.to_arrow_schema_ref(),
column_blocks)?;
+ let data = RecordBatchData::new(fetched_schema.to_arrow_schema_ref(),
column_blocks)?;
Ok(FetchedRecordBatch {
- schema,
- primary_key_indexes: ctx.primary_key_indexes().map(|idxs|
idxs.to_vec()),
+ schema: fetched_schema,
+ primary_key_indexes,
data,
})
}
diff --git a/common_types/src/time.rs b/common_types/src/time.rs
index 342a5509..14266d67 100644
--- a/common_types/src/time.rs
+++ b/common_types/src/time.rs
@@ -300,6 +300,13 @@ impl TimeRange {
self.exclusive_end.min(other.exclusive_end),
)
}
+
+ pub fn merge_range(&self, other: TimeRange) -> TimeRange {
+ TimeRange {
+ inclusive_start: self.inclusive_start.min(other.inclusive_start),
+ exclusive_end: self.exclusive_end.max(other.exclusive_end),
+ }
+ }
}
impl From<TimeRange> for time_range::TimeRange {
diff --git a/system_catalog/src/sys_catalog_table.rs
b/system_catalog/src/sys_catalog_table.rs
index 00f7b061..0eb8c523 100644
--- a/system_catalog/src/sys_catalog_table.rs
+++ b/system_catalog/src/sys_catalog_table.rs
@@ -309,6 +309,11 @@ impl SysCatalogTable {
common_types::OPTION_KEY_ENABLE_TTL.to_string(),
DEFAULT_ENABLE_TTL.to_string(),
);
+ // Disable layered memtable for system catalog table.
+ options.insert(
+ common_types::MUTABLE_SEGMENT_SWITCH_THRESHOLD.to_string(),
+ 0.to_string(),
+ );
let params = CreateTableParams {
catalog_name: consts::SYSTEM_CATALOG.to_string(),
schema_name: consts::SYSTEM_CATALOG_SCHEMA.to_string(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]