This is an automated email from the ASF dual-hosted git repository.

xikai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/dev by this push:
     new 53df95ac feat: impl layered memtable to reduce duplicated encode 
during scan (#1271)
53df95ac is described below

commit 53df95ac507d7658b7e6abb44820a49cc62bf6d4
Author: kamille <[email protected]>
AuthorDate: Mon Jan 8 15:41:20 2024 +0800

    feat: impl layered memtable to reduce duplicated encode during scan (#1271)
    
    ## Rationale
    Conversion from row format in memtable to record batch in datafusion has
    been found a cpu bottleneck in production. For reduce the cpu cost, I
    impl the layered memtable framework to support gradually conversion
    during normal write path(and before flush).
    
    ## Detailed Changes
    + Impl layered memtable framework
    + Integrate it into the write path.
    
    ## Test Plan
    Test by new ut and it.
---
 Cargo.lock                                         |  32 +-
 Cargo.toml                                         |   2 +-
 analytic_engine/src/instance/flush_compaction.rs   |  30 +-
 analytic_engine/src/lib.rs                         |   6 +
 analytic_engine/src/memtable/columnar/factory.rs   |   5 +-
 analytic_engine/src/memtable/factory.rs            |   9 +-
 .../src/memtable/{skiplist => layered}/factory.rs  |  41 +-
 analytic_engine/src/memtable/layered/iter.rs       | 120 ++++
 analytic_engine/src/memtable/layered/mod.rs        | 729 +++++++++++++++++++++
 analytic_engine/src/memtable/mod.rs                |  73 ++-
 analytic_engine/src/memtable/skiplist/factory.rs   |   4 +-
 analytic_engine/src/memtable/skiplist/mod.rs       | 133 ++--
 analytic_engine/src/memtable/test_util.rs          |  42 ++
 .../src/row_iter/record_batch_stream.rs            |   1 +
 analytic_engine/src/sst/parquet/async_reader.rs    |  19 +-
 analytic_engine/src/table/data.rs                  |  49 +-
 analytic_engine/src/table_meta_set_impl.rs         |   1 +
 analytic_engine/src/table_options.rs               |  62 +-
 benchmarks/src/scan_memtable_bench.rs              |   6 +-
 common_types/src/lib.rs                            |   1 +
 common_types/src/projected_schema.rs               |   2 +-
 common_types/src/record_batch.rs                   |  27 +-
 common_types/src/time.rs                           |   7 +
 system_catalog/src/sys_catalog_table.rs            |   5 +
 24 files changed, 1254 insertions(+), 152 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index b1f6b823..088009fa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -96,7 +96,7 @@ dependencies = [
  "atomic_enum",
  "base64 0.13.1",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "codec",
  "common_types",
  "datafusion",
@@ -1345,7 +1345,7 @@ dependencies = [
 [[package]]
 name = "ceresdbproto"
 version = "1.0.23"
-source = 
"git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc#cfdacccebb7c609cb1aac791b73ba9a838d7ade6";
+source = 
"git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32#cb36a32b827b3afe88ca04420f7fd21518f15293";
 dependencies = [
  "prost",
  "protoc-bin-vendored",
@@ -1528,7 +1528,7 @@ dependencies = [
  "async-trait",
  "bytes_ext",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "common_types",
  "etcd-client",
  "future_ext",
@@ -1606,7 +1606,7 @@ dependencies = [
  "arrow 43.0.0",
  "arrow_ext",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "chrono",
  "datafusion",
  "hash_ext",
@@ -2362,7 +2362,7 @@ dependencies = [
  "async-recursion",
  "async-trait",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "common_types",
  "datafusion",
  "datafusion-proto",
@@ -3921,7 +3921,7 @@ name = "meta_client"
 version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "common_types",
  "futures 0.3.28",
  "generic_error",
@@ -4446,7 +4446,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
  "bytes",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "chrono",
  "clru",
  "crc",
@@ -5323,7 +5323,7 @@ dependencies = [
  "async-trait",
  "bytes",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "clru",
  "cluster",
  "common_types",
@@ -5451,7 +5451,7 @@ dependencies = [
  "arrow 43.0.0",
  "async-trait",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "chrono",
  "cluster",
  "codec",
@@ -5765,7 +5765,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "arrow_ext",
  "async-trait",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "common_types",
  "futures 0.3.28",
  "generic_error",
@@ -5894,7 +5894,7 @@ name = "router"
 version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "cluster",
  "common_types",
  "generic_error",
@@ -6269,7 +6269,7 @@ dependencies = [
  "async-trait",
  "bytes_ext",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "clru",
  "cluster",
  "common_types",
@@ -6795,7 +6795,7 @@ dependencies = [
  "async-trait",
  "bytes_ext",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "codec",
  "common_types",
  "futures 0.3.28",
@@ -6817,7 +6817,7 @@ dependencies = [
  "arrow_ext",
  "async-trait",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "common_types",
  "datafusion",
  "datafusion-proto",
@@ -7020,7 +7020,7 @@ dependencies = [
 name = "time_ext"
 version = "1.2.6-alpha"
 dependencies = [
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "chrono",
  "common_types",
  "macros",
@@ -7672,7 +7672,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=cfdaccc)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/apache/incubator-horaedb-proto.git?rev=cb36a32)",
  "chrono",
  "codec",
  "common_types",
diff --git a/Cargo.toml b/Cargo.toml
index 1588c237..e20984f6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,7 +94,7 @@ bytes = "1"
 bytes_ext = { path = "components/bytes_ext" }
 catalog = { path = "catalog" }
 catalog_impls = { path = "catalog_impls" }
-ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git";, rev = 
"cfdaccc" }
+ceresdbproto = { git = 
"https://github.com/apache/incubator-horaedb-proto.git";, rev = "cb36a32" }
 codec = { path = "components/codec" }
 chrono = "0.4"
 clap = "3.0"
diff --git a/analytic_engine/src/instance/flush_compaction.rs 
b/analytic_engine/src/instance/flush_compaction.rs
index 880eb10c..b051fb04 100644
--- a/analytic_engine/src/instance/flush_compaction.rs
+++ b/analytic_engine/src/instance/flush_compaction.rs
@@ -63,7 +63,7 @@ use crate::{
         factory::{self, ColumnStats, ScanOptions, SstWriteOptions},
         file::{FileMeta, Level},
         meta_data::{SstMetaData, SstMetaReader},
-        writer::{MetaData, RecordBatchStream},
+        writer::MetaData,
     },
     table::{
         data::{self, TableData, TableDataRef},
@@ -367,7 +367,7 @@ impl FlushTask {
         let mut last_sequence = table_data.last_sequence();
         // Switch (freeze) all mutable memtables. And update segment duration 
if
         // suggestion is returned.
-        let mut need_reorder = false;
+        let mut need_reorder = table_data.enable_layered_memtable;
         if let Some(suggest_segment_duration) = 
current_version.suggest_duration() {
             info!(
                 "Update segment duration, table:{}, table_id:{}, 
segment_duration:{:?}",
@@ -483,7 +483,9 @@ impl FlushTask {
             }
         }
         for mem in &mems_to_flush.memtables {
-            let file = self.dump_normal_memtable(request_id.clone(), 
mem).await?;
+            let file = self
+                .dump_normal_memtable(request_id.clone(), mem, need_reorder)
+                .await?;
             if let Some(file) = file {
                 let sst_size = file.size;
                 files_to_level0.push(AddFile {
@@ -728,6 +730,7 @@ impl FlushTask {
         &self,
         request_id: RequestId,
         memtable_state: &MemTableState,
+        need_reorder: bool,
     ) -> Result<Option<FileMeta>> {
         let (min_key, max_key) = match (memtable_state.mem.min_key(), 
memtable_state.mem.max_key())
         {
@@ -778,8 +781,24 @@ impl FlushTask {
 
         let iter = build_mem_table_iter(memtable_state.mem.clone(), 
&self.table_data)?;
 
-        let record_batch_stream: RecordBatchStream =
-            Box::new(stream::iter(iter).map_err(|e| Box::new(e) as _));
+        let record_batch_stream = if need_reorder {
+            let schema = self.table_data.schema();
+            let primary_key_indexes = schema.primary_key_indexes().to_vec();
+            let reorder = Reorder {
+                iter,
+                schema,
+                order_by_col_indexes: primary_key_indexes,
+            };
+            Box::new(
+                reorder
+                    .into_stream()
+                    .await
+                    .context(ReorderMemIter)?
+                    .map(|batch| batch.box_err()),
+            ) as _
+        } else {
+            Box::new(stream::iter(iter).map(|batch| batch.box_err())) as _
+        };
 
         let sst_info = writer
             .write(request_id, &sst_meta, record_batch_stream)
@@ -1231,6 +1250,7 @@ fn build_mem_table_iter(
         need_dedup: table_data.dedup(),
         reverse: false,
         metrics_collector: None,
+        time_range: TimeRange::min_to_max(),
     };
     memtable
         .scan(scan_ctx, scan_req)
diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs
index 68f79683..43515a6e 100644
--- a/analytic_engine/src/lib.rs
+++ b/analytic_engine/src/lib.rs
@@ -83,6 +83,11 @@ pub struct Config {
     /// The ratio of table's write buffer size to trigger preflush, and it
     /// should be in the range (0, 1].
     pub preflush_write_buffer_size_ratio: f32,
+
+    /// The threshold to trigger switching mutable segment of memtable.
+    /// If it is zero, disable the layered memtable.
+    pub mutable_segment_switch_threshold: ReadableSize,
+
     pub enable_primary_key_sampling: bool,
 
     // Iterator scanning options
@@ -200,6 +205,7 @@ impl Default for Config {
             remote_engine_client: 
remote_engine_client::config::Config::default(),
             recover_mode: RecoverMode::TableBased,
             metrics: MetricsOptions::default(),
+            mutable_segment_switch_threshold: ReadableSize::mb(3),
         }
     }
 }
diff --git a/analytic_engine/src/memtable/columnar/factory.rs 
b/analytic_engine/src/memtable/columnar/factory.rs
index 1e8f0604..6ce4f996 100644
--- a/analytic_engine/src/memtable/columnar/factory.rs
+++ b/analytic_engine/src/memtable/columnar/factory.rs
@@ -24,10 +24,9 @@ use std::{
 
 use crate::memtable::{
     columnar::ColumnarMemTable,
-    factory::{Factory, Options, Result},
-    MemTableRef,
+    factory::{Factory, Options},
+    MemTableRef, Result,
 };
-
 /// Factory to create memtable
 #[derive(Debug)]
 pub struct ColumnarMemTableFactory;
diff --git a/analytic_engine/src/memtable/factory.rs 
b/analytic_engine/src/memtable/factory.rs
index 97a7bd43..b6952e6a 100644
--- a/analytic_engine/src/memtable/factory.rs
+++ b/analytic_engine/src/memtable/factory.rs
@@ -18,15 +18,8 @@ use std::{fmt, sync::Arc};
 
 use arena::CollectorRef;
 use common_types::{schema::Schema, SequenceNumber};
-use macros::define_result;
-use snafu::Snafu;
 
-use crate::memtable::MemTableRef;
-
-#[derive(Debug, Snafu)]
-pub enum Error {}
-
-define_result!(Error);
+use crate::memtable::{MemTableRef, Result};
 
 /// MemTable options
 #[derive(Clone)]
diff --git a/analytic_engine/src/memtable/skiplist/factory.rs 
b/analytic_engine/src/memtable/layered/factory.rs
similarity index 52%
copy from analytic_engine/src/memtable/skiplist/factory.rs
copy to analytic_engine/src/memtable/layered/factory.rs
index 1e6e8b1d..03c793b9 100644
--- a/analytic_engine/src/memtable/skiplist/factory.rs
+++ b/analytic_engine/src/memtable/layered/factory.rs
@@ -14,31 +14,38 @@
 
 //! Skiplist memtable factory
 
-use std::sync::{atomic::AtomicU64, Arc};
-
-use arena::MonoIncArena;
-use skiplist::{BytewiseComparator, Skiplist};
+use std::sync::Arc;
 
 use crate::memtable::{
-    factory::{Factory, Options, Result},
-    skiplist::SkiplistMemTable,
-    MemTableRef,
+    factory::{Factory, FactoryRef, Options},
+    layered::LayeredMemTable,
+    MemTableRef, Result,
 };
 
 /// Factory to create memtable
 #[derive(Debug)]
-pub struct SkiplistMemTableFactory;
+pub struct LayeredMemtableFactory {
+    inner_memtable_factory: FactoryRef,
+    mutable_switch_threshold: usize,
+}
+
+impl LayeredMemtableFactory {
+    pub fn new(inner_memtable_factory: FactoryRef, mutable_switch_threshold: 
usize) -> Self {
+        Self {
+            inner_memtable_factory,
+            mutable_switch_threshold,
+        }
+    }
+}
 
-impl Factory for SkiplistMemTableFactory {
+impl Factory for LayeredMemtableFactory {
     fn create_memtable(&self, opts: Options) -> Result<MemTableRef> {
-        let arena = MonoIncArena::with_collector(opts.arena_block_size as 
usize, opts.collector);
-        let skiplist = Skiplist::with_arena(BytewiseComparator, arena);
-        let memtable = Arc::new(SkiplistMemTable::new(
-            opts.schema,
-            skiplist,
-            AtomicU64::new(opts.creation_sequence),
-        ));
+        let memtable = LayeredMemTable::new(
+            &opts,
+            self.inner_memtable_factory.clone(),
+            self.mutable_switch_threshold,
+        )?;
 
-        Ok(memtable)
+        Ok(Arc::new(memtable))
     }
 }
diff --git a/analytic_engine/src/memtable/layered/iter.rs 
b/analytic_engine/src/memtable/layered/iter.rs
new file mode 100644
index 00000000..6e1f3030
--- /dev/null
+++ b/analytic_engine/src/memtable/layered/iter.rs
@@ -0,0 +1,120 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Skiplist memtable iterator
+
+use common_types::{record_batch::FetchedRecordBatch, schema::Schema, 
time::TimeRange};
+use generic_error::BoxError;
+use snafu::ResultExt;
+
+use crate::memtable::{
+    layered::{ImmutableSegment, MutableSegment},
+    ColumnarIterPtr, Internal, ProjectSchema, Result, ScanContext, ScanRequest,
+};
+
+/// Columnar iterator for [LayeredMemTable]
+pub(crate) struct ColumnarIterImpl {
+    selected_batch_iter: ColumnarIterPtr,
+}
+
+impl ColumnarIterImpl {
+    pub fn new(
+        memtable_schema: &Schema,
+        ctx: ScanContext,
+        request: ScanRequest,
+        mutable: &MutableSegment,
+        immutables: &[ImmutableSegment],
+    ) -> Result<Self> {
+        // Create projection for the memtable schema
+        let row_projector = request
+            .row_projector_builder
+            .build(memtable_schema)
+            .context(ProjectSchema)?;
+
+        let (maybe_mutable, selected_immutables) =
+            Self::filter_by_time_range(mutable, immutables, 
request.time_range);
+
+        let immutable_batches = selected_immutables
+            .flat_map(|imm| {
+                imm.record_batches().iter().map(|batch| {
+                    // TODO: reduce clone here.
+                    let fetched_schema = 
row_projector.fetched_schema().clone();
+                    let primary_key_indexes = row_projector
+                        .primary_key_indexes()
+                        .map(|idxs| idxs.to_vec());
+                    let fetched_column_indexes = 
row_projector.fetched_source_column_indexes();
+                    FetchedRecordBatch::try_new(
+                        fetched_schema,
+                        primary_key_indexes,
+                        fetched_column_indexes,
+                        batch.clone(),
+                    )
+                    .box_err()
+                    .with_context(|| Internal {
+                        msg: format!("row_projector:{row_projector:?}",),
+                    })
+                })
+            })
+            .collect::<Vec<_>>();
+        let immutable_iter = immutable_batches.into_iter();
+
+        let maybe_mutable_iter = match maybe_mutable {
+            Some(mutable) => Some(mutable.scan(ctx, request)?),
+            None => None,
+        };
+
+        let maybe_chained_iter = match maybe_mutable_iter {
+            Some(mutable_iter) => Box::new(mutable_iter.chain(immutable_iter)) 
as _,
+            None => Box::new(immutable_iter) as _,
+        };
+
+        Ok(Self {
+            selected_batch_iter: maybe_chained_iter,
+        })
+    }
+
+    fn filter_by_time_range<'a>(
+        mutable: &'a MutableSegment,
+        immutables: &'a [ImmutableSegment],
+        time_range: TimeRange,
+    ) -> (
+        Option<&'a MutableSegment>,
+        impl Iterator<Item = &'a ImmutableSegment>,
+    ) {
+        let maybe_mutable = {
+            let mutable_time_range = mutable.time_range();
+            mutable_time_range.and_then(|range| {
+                if range.intersect_with(time_range) {
+                    Some(mutable)
+                } else {
+                    None
+                }
+            })
+        };
+
+        let selected_immutables = immutables
+            .iter()
+            .filter(move |imm| imm.time_range().intersect_with(time_range));
+
+        (maybe_mutable, selected_immutables)
+    }
+}
+
+impl Iterator for ColumnarIterImpl {
+    type Item = Result<FetchedRecordBatch>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.selected_batch_iter.next()
+    }
+}
diff --git a/analytic_engine/src/memtable/layered/mod.rs 
b/analytic_engine/src/memtable/layered/mod.rs
new file mode 100644
index 00000000..92087e1e
--- /dev/null
+++ b/analytic_engine/src/memtable/layered/mod.rs
@@ -0,0 +1,729 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! MemTable based on skiplist
+
+pub mod factory;
+pub mod iter;
+
+use std::{
+    mem,
+    ops::{Bound, Deref},
+    sync::{
+        atomic::{self, AtomicU64},
+        RwLock,
+    },
+};
+
+use arena::CollectorRef;
+use arrow::record_batch::RecordBatch as ArrowRecordBatch;
+use bytes_ext::Bytes;
+use common_types::{
+    projected_schema::RowProjectorBuilder, row::Row, schema::Schema, 
time::TimeRange,
+    SequenceNumber,
+};
+use generic_error::BoxError;
+use logger::debug;
+use skiplist::{BytewiseComparator, KeyComparator};
+use snafu::{OptionExt, ResultExt};
+
+use crate::memtable::{
+    factory::{FactoryRef, Options},
+    key::KeySequence,
+    layered::iter::ColumnarIterImpl,
+    ColumnarIterPtr, Internal, InternalNoCause, MemTable, MemTableRef, Metrics 
as MemtableMetrics,
+    PutContext, Result, ScanContext, ScanRequest,
+};
+
+/// MemTable implementation based on skiplist
+pub(crate) struct LayeredMemTable {
+    /// Schema of this memtable, is immutable.
+    schema: Schema,
+
+    /// The last sequence of the rows in this memtable. Update to this field
+    /// require external synchronization.
+    last_sequence: AtomicU64,
+
+    inner: RwLock<Inner>,
+
+    mutable_switch_threshold: usize,
+}
+
+impl LayeredMemTable {
+    pub fn new(
+        opts: &Options,
+        inner_memtable_factory: FactoryRef,
+        mutable_switch_threshold: usize,
+    ) -> Result<Self> {
+        let inner = Inner::new(inner_memtable_factory, opts)?;
+
+        Ok(Self {
+            schema: opts.schema.clone(),
+            last_sequence: AtomicU64::new(opts.creation_sequence),
+            inner: RwLock::new(inner),
+            mutable_switch_threshold,
+        })
+    }
+
+    // Used for testing only
+    #[cfg(test)]
+    fn force_switch_mutable_segment(&self) -> Result<()> {
+        let inner = &mut *self.inner.write().unwrap();
+        inner.switch_mutable_segment(self.schema.clone())
+    }
+}
+
+impl MemTable for LayeredMemTable {
+    fn schema(&self) -> &Schema {
+        &self.schema
+    }
+
+    fn min_key(&self) -> Option<Bytes> {
+        self.inner.read().unwrap().min_key()
+    }
+
+    fn max_key(&self) -> Option<Bytes> {
+        self.inner.read().unwrap().max_key()
+    }
+
+    fn put(
+        &self,
+        ctx: &mut PutContext,
+        sequence: KeySequence,
+        row: &Row,
+        schema: &Schema,
+    ) -> Result<()> {
+        let memory_usage = {
+            let inner = self.inner.read().unwrap();
+            inner.put(ctx, sequence, row, schema)?;
+            inner.mutable_segment.0.approximate_memory_usage()
+        };
+
+        if memory_usage > self.mutable_switch_threshold {
+            debug!(
+                "LayeredMemTable put, memory_usage:{memory_usage}, 
mutable_switch_threshold:{}",
+                self.mutable_switch_threshold
+            );
+            let inner = &mut *self.inner.write().unwrap();
+            inner.switch_mutable_segment(self.schema.clone())?;
+        }
+
+        Ok(())
+    }
+
+    fn scan(&self, ctx: ScanContext, request: ScanRequest) -> 
Result<ColumnarIterPtr> {
+        let inner = self.inner.read().unwrap();
+        inner.scan(&self.schema, ctx, request)
+    }
+
+    fn approximate_memory_usage(&self) -> usize {
+        self.inner.read().unwrap().approximate_memory_usage()
+    }
+
+    fn set_last_sequence(&self, sequence: SequenceNumber) -> Result<()> {
+        self.last_sequence
+            .store(sequence, atomic::Ordering::Relaxed);
+        Ok(())
+    }
+
+    fn last_sequence(&self) -> SequenceNumber {
+        self.last_sequence.load(atomic::Ordering::Relaxed)
+    }
+
+    fn time_range(&self) -> Option<TimeRange> {
+        let inner = self.inner.read().unwrap();
+        inner.time_range()
+    }
+
+    fn metrics(&self) -> MemtableMetrics {
+        // FIXME: stats and return metrics
+        MemtableMetrics::default()
+    }
+}
+
+/// Layered memtable inner
+struct Inner {
+    mutable_segment_builder: MutableSegmentBuilder,
+    mutable_segment: MutableSegment,
+    immutable_segments: Vec<ImmutableSegment>,
+}
+
+impl Inner {
+    fn new(memtable_factory: FactoryRef, opts: &Options) -> Result<Self> {
+        let builder_opts = MutableBuilderOptions {
+            schema: opts.schema.clone(),
+            arena_block_size: opts.arena_block_size,
+            collector: opts.collector.clone(),
+        };
+        let mutable_segment_builder = 
MutableSegmentBuilder::new(memtable_factory, builder_opts);
+
+        // Build the first mutable batch.
+        let init_mutable_segment = mutable_segment_builder.build()?;
+
+        Ok(Self {
+            mutable_segment_builder,
+            mutable_segment: init_mutable_segment,
+            immutable_segments: vec![],
+        })
+    }
+
+    /// Scan batches including `mutable` and `immutable`s.
+    #[inline]
+    fn scan(
+        &self,
+        schema: &Schema,
+        ctx: ScanContext,
+        request: ScanRequest,
+    ) -> Result<ColumnarIterPtr> {
+        let iter = ColumnarIterImpl::new(
+            schema,
+            ctx,
+            request,
+            &self.mutable_segment,
+            &self.immutable_segments,
+        )?;
+        Ok(Box::new(iter))
+    }
+
+    #[inline]
+    fn put(
+        &self,
+        ctx: &mut PutContext,
+        sequence: KeySequence,
+        row: &Row,
+        schema: &Schema,
+    ) -> Result<()> {
+        self.mutable_segment.put(ctx, sequence, row, schema)
+    }
+
+    fn switch_mutable_segment(&mut self, schema: Schema) -> Result<()> {
+        let imm_num = self.immutable_segments.len();
+        debug!("LayeredMemTable switch_mutable_segment, imm_num:{imm_num}");
+
+        // Build a new mutable segment, and replace current's.
+        let new_mutable = self.mutable_segment_builder.build()?;
+        let current_mutable = mem::replace(&mut self.mutable_segment, 
new_mutable);
+        let fetched_schema = schema.to_record_schema();
+
+        // Convert current's to immutable.
+        let scan_ctx = ScanContext::default();
+        let row_projector_builder = RowProjectorBuilder::new(fetched_schema, 
schema, None);
+        let scan_req = ScanRequest {
+            start_user_key: Bound::Unbounded,
+            end_user_key: Bound::Unbounded,
+            sequence: common_types::MAX_SEQUENCE_NUMBER,
+            need_dedup: false,
+            reverse: false,
+            metrics_collector: None,
+            time_range: TimeRange::min_to_max(),
+            row_projector_builder,
+        };
+
+        let immutable_batches = current_mutable
+            .scan(scan_ctx, scan_req)?
+            .map(|batch_res| batch_res.map(|batch| 
batch.into_arrow_record_batch()))
+            .collect::<Result<Vec<_>>>()?;
+
+        let time_range = current_mutable.time_range().context(InternalNoCause {
+            msg: "failed to get time range from mutable segment",
+        })?;
+        let max_key = current_mutable.max_key().context(InternalNoCause {
+            msg: "failed to get max key from mutable segment",
+        })?;
+        let min_key = current_mutable.min_key().context(InternalNoCause {
+            msg: "failed to get min key from mutable segment",
+        })?;
+        let immutable = ImmutableSegment::new(immutable_batches, time_range, 
min_key, max_key);
+
+        self.immutable_segments.push(immutable);
+
+        Ok(())
+    }
+
+    pub fn min_key(&self) -> Option<Bytes> {
+        let comparator = BytewiseComparator;
+
+        let mutable_min_key = self.mutable_segment.min_key();
+
+        let immutable_min_key = if self.immutable_segments.is_empty() {
+            None
+        } else {
+            let mut min_key = 
self.immutable_segments.first().unwrap().min_key();
+            let mut imm_iter = self.immutable_segments.iter();
+            let _ = imm_iter.next();
+            for imm in imm_iter {
+                if let std::cmp::Ordering::Greater = 
comparator.compare_key(&min_key, &imm.min_key)
+                {
+                    min_key = imm.min_key();
+                }
+            }
+
+            Some(min_key)
+        };
+
+        match (mutable_min_key, immutable_min_key) {
+            (None, None) => None,
+            (None, Some(key)) | (Some(key), None) => Some(key),
+            (Some(key1), Some(key2)) => Some(match 
comparator.compare_key(&key1, &key2) {
+                std::cmp::Ordering::Greater => key2,
+                std::cmp::Ordering::Less | std::cmp::Ordering::Equal => key1,
+            }),
+        }
+    }
+
+    pub fn max_key(&self) -> Option<Bytes> {
+        let comparator = BytewiseComparator;
+
+        let mutable_max_key = self.mutable_segment.max_key();
+
+        let immutable_max_key = if self.immutable_segments.is_empty() {
+            None
+        } else {
+            let mut max_key = 
self.immutable_segments.first().unwrap().max_key();
+            let mut imm_iter = self.immutable_segments.iter();
+            let _ = imm_iter.next();
+            for imm in imm_iter {
+                if let std::cmp::Ordering::Less = 
comparator.compare_key(&max_key, &imm.max_key) {
+                    max_key = imm.max_key();
+                }
+            }
+
+            Some(max_key)
+        };
+
+        match (mutable_max_key, immutable_max_key) {
+            (None, None) => None,
+            (None, Some(key)) | (Some(key), None) => Some(key),
+            (Some(key1), Some(key2)) => Some(match 
comparator.compare_key(&key1, &key2) {
+                std::cmp::Ordering::Less => key2,
+                std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => 
key1,
+            }),
+        }
+    }
+
+    pub fn time_range(&self) -> Option<TimeRange> {
+        let mutable_time_range = self.mutable_segment.time_range();
+
+        let immutable_time_range = if self.immutable_segments.is_empty() {
+            None
+        } else {
+            let mut time_range = 
self.immutable_segments.first().unwrap().time_range();
+            let mut imm_iter = self.immutable_segments.iter();
+            let _ = imm_iter.next();
+            for imm in imm_iter {
+                time_range = time_range.merge_range(imm.time_range());
+            }
+
+            Some(time_range)
+        };
+
+        match (mutable_time_range, immutable_time_range) {
+            (None, None) => None,
+            (None, Some(range)) | (Some(range), None) => Some(range),
+            (Some(range1), Some(range2)) => Some(range1.merge_range(range2)),
+        }
+    }
+
+    fn approximate_memory_usage(&self) -> usize {
+        let mutable_mem_usage = 
self.mutable_segment.approximate_memory_usage();
+
+        let immutable_mem_usage = self
+            .immutable_segments
+            .iter()
+            .map(|imm| imm.approximate_memory_usage())
+            .sum::<usize>();
+
+        mutable_mem_usage + immutable_mem_usage
+    }
+}
+
+/// Mutable batch
+pub(crate) struct MutableSegment(MemTableRef);
+
+impl Deref for MutableSegment {
+    type Target = MemTableRef;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+/// Builder for `MutableBatch`
+struct MutableSegmentBuilder {
+    memtable_factory: FactoryRef,
+    opts: MutableBuilderOptions,
+}
+
+impl MutableSegmentBuilder {
+    fn new(memtable_factory: FactoryRef, opts: MutableBuilderOptions) -> Self {
+        Self {
+            memtable_factory,
+            opts,
+        }
+    }
+
+    fn build(&self) -> Result<MutableSegment> {
+        let memtable_opts = Options {
+            schema: self.opts.schema.clone(),
+            arena_block_size: self.opts.arena_block_size,
+            // `creation_sequence` is meaningless in inner memtable, just set 
it to min.
+            creation_sequence: SequenceNumber::MIN,
+            collector: self.opts.collector.clone(),
+        };
+
+        let memtable = self
+            .memtable_factory
+            .create_memtable(memtable_opts)
+            .box_err()
+            .context(Internal {
+                msg: "failed to build mutable segment",
+            })?;
+
+        Ok(MutableSegment(memtable))
+    }
+}
+
+struct MutableBuilderOptions {
+    pub schema: Schema,
+
+    /// Block size of arena in bytes.
+    pub arena_block_size: u32,
+
+    /// Memory usage collector
+    pub collector: CollectorRef,
+}
+
+/// Immutable batch
+pub(crate) struct ImmutableSegment {
+    /// Record batch converted from `MutableBatch`    
+    record_batches: Vec<ArrowRecordBatch>,
+
+    /// Min time of source `MutableBatch`
+    time_range: TimeRange,
+
+    /// Min key of source `MutableBatch`
+    min_key: Bytes,
+
+    /// Max key of source `MutableBatch`
+    max_key: Bytes,
+
+    approximate_memory_size: usize,
+}
+
+impl ImmutableSegment {
+    fn new(
+        record_batches: Vec<ArrowRecordBatch>,
+        time_range: TimeRange,
+        min_key: Bytes,
+        max_key: Bytes,
+    ) -> Self {
+        let approximate_memory_size = record_batches
+            .iter()
+            .map(|batch| batch.get_array_memory_size())
+            .sum();
+
+        Self {
+            record_batches,
+            time_range,
+            min_key,
+            max_key,
+            approximate_memory_size,
+        }
+    }
+
+    pub fn time_range(&self) -> TimeRange {
+        self.time_range
+    }
+
+    pub fn min_key(&self) -> Bytes {
+        self.min_key.clone()
+    }
+
+    pub fn max_key(&self) -> Bytes {
+        self.max_key.clone()
+    }
+
+    // TODO: maybe return a iterator?
+    pub fn record_batches(&self) -> &[ArrowRecordBatch] {
+        &self.record_batches
+    }
+
+    pub fn approximate_memory_usage(&self) -> usize {
+        self.approximate_memory_size
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use std::{ops::Bound, sync::Arc};
+
+    use arena::NoopCollector;
+    use bytes_ext::ByteVec;
+    use codec::{memcomparable::MemComparable, Encoder};
+    use common_types::{
+        datum::Datum,
+        projected_schema::{ProjectedSchema, RowProjectorBuilder},
+        record_batch::FetchedRecordBatch,
+        row::Row,
+        schema::IndexInWriterSchema,
+        tests::{build_row, build_schema},
+    };
+
+    use super::*;
+    use crate::memtable::{
+        factory::Options,
+        key::ComparableInternalKey,
+        skiplist::factory::SkiplistMemTableFactory,
+        test_util::{TestMemtableBuilder, TestUtil},
+        MemTableRef,
+    };
+
+    struct TestMemtableBuilderImpl;
+
+    impl TestMemtableBuilder for TestMemtableBuilderImpl {
+        fn build(&self, data: &[(KeySequence, Row)]) -> MemTableRef {
+            let schema = build_schema();
+            let factory = SkiplistMemTableFactory;
+            let opts = Options {
+                schema: schema.clone(),
+                arena_block_size: 512,
+                creation_sequence: 1,
+                collector: Arc::new(NoopCollector {}),
+            };
+            let memtable = LayeredMemTable::new(&opts, Arc::new(factory), 
usize::MAX).unwrap();
+
+            let mut ctx =
+                
PutContext::new(IndexInWriterSchema::for_same_schema(schema.num_columns()));
+            let partitioned_data = data.chunks(3).collect::<Vec<_>>();
+            let chunk_num = partitioned_data.len();
+
+            for chunk in partitioned_data.iter().take(chunk_num - 1) {
+                for (seq, row) in *chunk {
+                    memtable.put(&mut ctx, *seq, row, &schema).unwrap();
+                }
+                memtable.force_switch_mutable_segment().unwrap();
+            }
+
+            let last_chunk = partitioned_data[chunk_num - 1];
+            for (seq, row) in last_chunk {
+                memtable.put(&mut ctx, *seq, row, &schema).unwrap();
+            }
+
+            Arc::new(memtable)
+        }
+    }
+
+    fn test_data() -> Vec<(KeySequence, Row)> {
+        vec![
+            (
+                KeySequence::new(1, 1),
+                build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
+            ),
+            (
+                KeySequence::new(1, 2),
+                build_row(b"b", 2, 10.0, "v2", 2000, 2_000_000),
+            ),
+            (
+                KeySequence::new(1, 4),
+                build_row(b"c", 3, 10.0, "v3", 3000, 3_000_000),
+            ),
+            (
+                KeySequence::new(2, 1),
+                build_row(b"d", 4, 10.0, "v4", 4000, 4_000_000),
+            ),
+            (
+                KeySequence::new(2, 1),
+                build_row(b"e", 5, 10.0, "v5", 5000, 5_000_000),
+            ),
+            (
+                KeySequence::new(2, 3),
+                build_row(b"f", 6, 10.0, "v6", 6000, 6_000_000),
+            ),
+            (
+                KeySequence::new(3, 4),
+                build_row(b"g", 7, 10.0, "v7", 7000, 7_000_000),
+            ),
+        ]
+    }
+
+    #[test]
+    fn test_memtable_scan() {
+        let builder = TestMemtableBuilderImpl;
+        let data = test_data();
+        let test_util = TestUtil::new(builder, data);
+        let memtable = test_util.memtable();
+        let schema = memtable.schema().clone();
+
+        // No projection.
+        let projection = (0..schema.num_columns()).collect::<Vec<_>>();
+        let expected = test_util.data();
+        test_memtable_scan_internal(
+            schema.clone(),
+            projection,
+            TimeRange::min_to_max(),
+            memtable.clone(),
+            expected,
+        );
+
+        // Projection to first three.
+        let projection = vec![0, 1, 3];
+        let expected = test_util
+            .data()
+            .iter()
+            .map(|row| {
+                let datums = vec![row[0].clone(), row[1].clone(), 
row[3].clone()];
+                Row::from_datums(datums)
+            })
+            .collect();
+        test_memtable_scan_internal(
+            schema.clone(),
+            projection,
+            TimeRange::min_to_max(),
+            memtable.clone(),
+            expected,
+        );
+
+        // No projection.
+        let projection = (0..schema.num_columns()).collect::<Vec<_>>();
+        let time_range = TimeRange::new(2.into(), 7.into()).unwrap();
+        // Memtable data after switching may be like(just showing timestamp 
column using
+        // to filter):  [1, 2, 3], [4, 5, 6], [7]
+        //
+        // And the target time range is: [2, 7)
+        //
+        // So the filter result should be: [1, 2, 3], [4, 5, 6]
+        let expected = test_util
+            .data()
+            .iter()
+            .enumerate()
+            .filter_map(|(idx, row)| if idx < 6 { Some(row.clone()) } else { 
None })
+            .collect();
+        test_memtable_scan_internal(
+            schema.clone(),
+            projection,
+            time_range,
+            memtable.clone(),
+            expected,
+        );
+    }
+
+    #[test]
+    fn test_time_range() {
+        let builder = TestMemtableBuilderImpl;
+        let data = test_data();
+        let test_util = TestUtil::new(builder, data);
+        let memtable = test_util.memtable();
+
+        assert_eq!(TimeRange::new(1.into(), 8.into()), memtable.time_range());
+    }
+
+    #[test]
+    fn test_min_max_key() {
+        let builder = TestMemtableBuilderImpl;
+        let data = test_data();
+        let test_util = TestUtil::new(builder, data.clone());
+        let memtable = test_util.memtable();
+        let schema = memtable.schema();
+
+        // Min key
+        let key_encoder = ComparableInternalKey::new(data[0].0, schema);
+        let mut min_key = Vec::new();
+        min_key.reserve(key_encoder.estimate_encoded_size(&data[0].1));
+        key_encoder.encode(&mut min_key, &data[0].1).unwrap();
+        let key_encoder = ComparableInternalKey::new(data[0].0, schema);
+        let mut min_key = Vec::new();
+        min_key.reserve(key_encoder.estimate_encoded_size(&data[0].1));
+        key_encoder.encode(&mut min_key, &data[0].1).unwrap();
+
+        // Max key
+        let key_encoder = ComparableInternalKey::new(data[6].0, schema);
+        let mut max_key = Vec::new();
+        max_key.reserve(key_encoder.estimate_encoded_size(&data[6].1));
+        key_encoder.encode(&mut max_key, &data[6].1).unwrap();
+        let key_encoder = ComparableInternalKey::new(data[6].0, schema);
+        let mut max_key = Vec::new();
+        max_key.reserve(key_encoder.estimate_encoded_size(&data[6].1));
+        key_encoder.encode(&mut max_key, &data[6].1).unwrap();
+
+        assert_eq!(min_key, memtable.min_key().unwrap().to_vec());
+        assert_eq!(max_key, memtable.max_key().unwrap().to_vec());
+    }
+
+    fn test_memtable_scan_internal(
+        schema: Schema,
+        projection: Vec<usize>,
+        time_range: TimeRange,
+        memtable: Arc<dyn MemTable + Send + Sync>,
+        expected: Vec<Row>,
+    ) {
+        let projected_schema = ProjectedSchema::new(schema, 
Some(projection)).unwrap();
+        let fetched_schema = projected_schema.to_record_schema();
+        let table_schema = projected_schema.table_schema();
+        let row_projector_builder =
+            RowProjectorBuilder::new(fetched_schema, table_schema.clone(), 
None);
+
+        // limited by sequence
+        let scan_request = ScanRequest {
+            start_user_key: Bound::Unbounded,
+            end_user_key: Bound::Unbounded,
+            sequence: SequenceNumber::MAX,
+            row_projector_builder,
+            need_dedup: false,
+            reverse: false,
+            metrics_collector: None,
+            time_range,
+        };
+        let scan_ctx = ScanContext::default();
+        let iter = memtable.scan(scan_ctx, scan_request).unwrap();
+        check_iterator(iter, expected);
+    }
+
+    fn check_iterator<T: Iterator<Item = Result<FetchedRecordBatch>>>(
+        iter: T,
+        expected_rows: Vec<Row>,
+    ) {
+        // sort it first.
+        let mut rows = Vec::new();
+        for batch in iter {
+            let batch = batch.unwrap();
+            for row_idx in 0..batch.num_rows() {
+                rows.push(batch.clone_row_at(row_idx));
+            }
+        }
+
+        rows.sort_by(|a, b| {
+            let key1 = build_scan_key(
+                &String::from_utf8_lossy(a[0].as_varbinary().unwrap()),
+                a[1].as_timestamp().unwrap().as_i64(),
+            );
+            let key2 = build_scan_key(
+                &String::from_utf8_lossy(b[0].as_varbinary().unwrap()),
+                b[1].as_timestamp().unwrap().as_i64(),
+            );
+            BytewiseComparator.compare_key(&key1, &key2)
+        });
+
+        assert_eq!(rows, expected_rows);
+    }
+
+    fn build_scan_key(c1: &str, c2: i64) -> Bytes {
+        let mut buf = ByteVec::new();
+        let encoder = MemComparable;
+        encoder.encode(&mut buf, &Datum::from(c1)).unwrap();
+        encoder.encode(&mut buf, &Datum::from(c2)).unwrap();
+
+        Bytes::from(buf)
+    }
+}
diff --git a/analytic_engine/src/memtable/mod.rs 
b/analytic_engine/src/memtable/mod.rs
index ed3b20d3..906936bb 100644
--- a/analytic_engine/src/memtable/mod.rs
+++ b/analytic_engine/src/memtable/mod.rs
@@ -17,24 +17,28 @@
 pub mod columnar;
 pub mod factory;
 pub mod key;
+pub mod layered;
 mod reversed_iter;
 pub mod skiplist;
+pub mod test_util;
 
-use std::{ops::Bound, sync::Arc, time::Instant};
+use std::{collections::HashMap, ops::Bound, sync::Arc, time::Instant};
 
 use bytes_ext::{ByteVec, Bytes};
+use ceresdbproto::manifest;
 use common_types::{
     projected_schema::RowProjectorBuilder,
     record_batch::FetchedRecordBatch,
     row::Row,
     schema::{IndexInWriterSchema, Schema},
     time::TimeRange,
-    SequenceNumber,
+    SequenceNumber, MUTABLE_SEGMENT_SWITCH_THRESHOLD,
 };
-use generic_error::GenericError;
+use generic_error::{BoxError, GenericError};
 use macros::define_result;
 use serde::{Deserialize, Serialize};
-use snafu::{Backtrace, Snafu};
+use size_ext::ReadableSize;
+use snafu::{Backtrace, ResultExt, Snafu};
 use trace_metric::MetricsCollector;
 
 use crate::memtable::key::KeySequence;
@@ -46,13 +50,13 @@ const MEMTABLE_TYPE_COLUMNAR: &str = "columnar";
 #[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)]
 pub enum MemtableType {
     SkipList,
-    Columnar,
+    Column,
 }
 
 impl MemtableType {
     pub fn parse_from(s: &str) -> Self {
         if s.eq_ignore_ascii_case(MEMTABLE_TYPE_COLUMNAR) {
-            MemtableType::Columnar
+            MemtableType::Column
         } else {
             MemtableType::SkipList
         }
@@ -63,7 +67,54 @@ impl ToString for MemtableType {
     fn to_string(&self) -> String {
         match self {
             MemtableType::SkipList => MEMTABLE_TYPE_SKIPLIST.to_string(),
-            MemtableType::Columnar => MEMTABLE_TYPE_COLUMNAR.to_string(),
+            MemtableType::Column => MEMTABLE_TYPE_COLUMNAR.to_string(),
+        }
+    }
+}
+
+/// Layered memtable options
+/// If `mutable_segment_switch_threshold` is set zero, layered memtable will be
+/// disable.
+#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
+#[serde(default)]
+pub struct LayeredMemtableOptions {
+    pub mutable_segment_switch_threshold: ReadableSize,
+}
+
+impl Default for LayeredMemtableOptions {
+    fn default() -> Self {
+        Self {
+            mutable_segment_switch_threshold: ReadableSize::mb(3),
+        }
+    }
+}
+
+impl LayeredMemtableOptions {
+    pub fn parse_from(opts: &HashMap<String, String>) -> Result<Self> {
+        let mut options = LayeredMemtableOptions::default();
+        if let Some(v) = opts.get(MUTABLE_SEGMENT_SWITCH_THRESHOLD) {
+            let threshold = v.parse::<u64>().box_err().context(Internal {
+                msg: format!("invalid mutable segment switch threshold:{v}"),
+            })?;
+            options.mutable_segment_switch_threshold = ReadableSize(threshold);
+        }
+
+        Ok(options)
+    }
+}
+
+impl From<manifest::LayeredMemtableOptions> for LayeredMemtableOptions {
+    fn from(value: manifest::LayeredMemtableOptions) -> Self {
+        Self {
+            mutable_segment_switch_threshold: 
ReadableSize(value.mutable_segment_switch_threshold),
+        }
+    }
+}
+
+impl From<LayeredMemtableOptions> for manifest::LayeredMemtableOptions {
+    fn from(value: LayeredMemtableOptions) -> Self {
+        Self {
+            mutable_segment_switch_threshold: 
value.mutable_segment_switch_threshold.0,
         }
     }
 }
@@ -147,6 +198,11 @@ pub enum Error {
         max: usize,
         backtrace: Backtrace,
     },
+    #[snafu(display("Factory err, msg:{msg}, err:{source}"))]
+    Factory { msg: String, source: GenericError },
+
+    #[snafu(display("Factory err, msg:{msg}.\nBacktrace:\n{backtrace}"))]
+    FactoryNoCause { msg: String, backtrace: Backtrace },
 }
 
 pub const TOO_LARGE_MESSAGE: &str = "Memtable key length is too large";
@@ -208,6 +264,7 @@ pub struct ScanRequest {
     pub reverse: bool,
     /// Collector for scan metrics.
     pub metrics_collector: Option<MetricsCollector>,
+    pub time_range: TimeRange,
 }
 
 /// In memory storage for table's data.
@@ -277,7 +334,7 @@ pub trait MemTable {
     fn metrics(&self) -> Metrics;
 }
 
-#[derive(Debug)]
+#[derive(Debug, Default)]
 pub struct Metrics {
     /// Size of original rows.
     pub row_raw_size: usize,
diff --git a/analytic_engine/src/memtable/skiplist/factory.rs 
b/analytic_engine/src/memtable/skiplist/factory.rs
index 1e6e8b1d..3f90bcde 100644
--- a/analytic_engine/src/memtable/skiplist/factory.rs
+++ b/analytic_engine/src/memtable/skiplist/factory.rs
@@ -20,9 +20,9 @@ use arena::MonoIncArena;
 use skiplist::{BytewiseComparator, Skiplist};
 
 use crate::memtable::{
-    factory::{Factory, Options, Result},
+    factory::{Factory, Options},
     skiplist::SkiplistMemTable,
-    MemTableRef,
+    MemTableRef, Result,
 };
 
 /// Factory to create memtable
diff --git a/analytic_engine/src/memtable/skiplist/mod.rs 
b/analytic_engine/src/memtable/skiplist/mod.rs
index a71a82a6..c0a47cf3 100644
--- a/analytic_engine/src/memtable/skiplist/mod.rs
+++ b/analytic_engine/src/memtable/skiplist/mod.rs
@@ -286,8 +286,35 @@ mod tests {
     use crate::memtable::{
         factory::{Factory, Options},
         skiplist::factory::SkiplistMemTableFactory,
+        test_util::{TestMemtableBuilder, TestUtil},
+        MemTableRef,
     };
 
+    struct TestMemtableBuilderImpl;
+
+    impl TestMemtableBuilder for TestMemtableBuilderImpl {
+        fn build(&self, data: &[(KeySequence, Row)]) -> MemTableRef {
+            let schema = build_schema();
+            let factory = SkiplistMemTableFactory;
+            let memtable = factory
+                .create_memtable(Options {
+                    schema: schema.clone(),
+                    arena_block_size: 512,
+                    creation_sequence: 1,
+                    collector: Arc::new(NoopCollector {}),
+                })
+                .unwrap();
+
+            let mut ctx =
+                
PutContext::new(IndexInWriterSchema::for_same_schema(schema.num_columns()));
+            for (seq, row) in data {
+                memtable.put(&mut ctx, *seq, row, &schema).unwrap();
+            }
+
+            memtable
+        }
+    }
+
     fn test_memtable_scan_for_scan_request(
         schema: Schema,
         memtable: Arc<dyn MemTable + Send + Sync>,
@@ -309,6 +336,7 @@ mod tests {
                     need_dedup: true,
                     reverse: false,
                     metrics_collector: None,
+                    time_range: TimeRange::min_to_max(),
                 },
                 vec![
                     build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
@@ -329,6 +357,7 @@ mod tests {
                     need_dedup: true,
                     reverse: false,
                     metrics_collector: None,
+                    time_range: TimeRange::min_to_max(),
                 },
                 vec![
                     build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
@@ -348,6 +377,7 @@ mod tests {
                     need_dedup: true,
                     reverse: false,
                     metrics_collector: None,
+                    time_range: TimeRange::min_to_max(),
                 },
                 vec![
                     build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
@@ -383,6 +413,7 @@ mod tests {
                 need_dedup: true,
                 reverse: false,
                 metrics_collector: None,
+                time_range: TimeRange::min_to_max(),
             },
             vec![
                 build_row_for_two_column(b"a", 1),
@@ -401,19 +432,52 @@ mod tests {
 
     #[test]
     fn test_memtable_scan() {
-        let schema = build_schema();
-        let factory = SkiplistMemTableFactory;
-        let memtable = factory
-            .create_memtable(Options {
-                schema: schema.clone(),
-                arena_block_size: 512,
-                creation_sequence: 1,
-                collector: Arc::new(NoopCollector {}),
-            })
-            .unwrap();
-
-        let mut ctx = 
PutContext::new(IndexInWriterSchema::for_same_schema(schema.num_columns()));
-        let input = vec![
+        let data = test_data();
+        let builder = TestMemtableBuilderImpl;
+        let test_util = TestUtil::new(builder, data);
+        let memtable = test_util.memtable();
+        let schema = memtable.schema().clone();
+
+        test_memtable_scan_for_scan_request(schema.clone(), memtable.clone());
+        test_memtable_scan_for_projection(schema, memtable);
+    }
+
+    fn check_iterator<T: Iterator<Item = Result<FetchedRecordBatch>>>(
+        iter: T,
+        expected_rows: Vec<Row>,
+    ) {
+        let mut visited_rows = 0;
+        for batch in iter {
+            let batch = batch.unwrap();
+            for row_idx in 0..batch.num_rows() {
+                assert_eq!(batch.clone_row_at(row_idx), 
expected_rows[visited_rows]);
+                visited_rows += 1;
+            }
+        }
+
+        assert_eq!(visited_rows, expected_rows.len());
+    }
+
+    fn build_scan_key(c1: &str, c2: i64) -> Bytes {
+        let mut buf = ByteVec::new();
+        let encoder = MemComparable;
+        encoder.encode(&mut buf, &Datum::from(c1)).unwrap();
+        encoder.encode(&mut buf, &Datum::from(c2)).unwrap();
+
+        Bytes::from(buf)
+    }
+
+    pub fn build_row_for_two_column(key1: &[u8], key2: i64) -> Row {
+        let datums = vec![
+            Datum::Varbinary(Bytes::copy_from_slice(key1)),
+            Datum::Timestamp(Timestamp::new(key2)),
+        ];
+
+        Row::from_datums(datums)
+    }
+
+    fn test_data() -> Vec<(KeySequence, Row)> {
+        vec![
             (
                 KeySequence::new(1, 1),
                 build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
@@ -453,47 +517,6 @@ mod tests {
                 KeySequence::new(3, 4),
                 build_row(b"g", 7, 10.0, "v7", 7000, 7_000_000),
             ),
-        ];
-
-        for (seq, row) in input {
-            memtable.put(&mut ctx, seq, &row, &schema).unwrap();
-        }
-
-        test_memtable_scan_for_scan_request(schema.clone(), memtable.clone());
-        test_memtable_scan_for_projection(schema, memtable);
-    }
-
-    fn check_iterator<T: Iterator<Item = Result<FetchedRecordBatch>>>(
-        iter: T,
-        expected_rows: Vec<Row>,
-    ) {
-        let mut visited_rows = 0;
-        for batch in iter {
-            let batch = batch.unwrap();
-            for row_idx in 0..batch.num_rows() {
-                assert_eq!(batch.clone_row_at(row_idx), 
expected_rows[visited_rows]);
-                visited_rows += 1;
-            }
-        }
-
-        assert_eq!(visited_rows, expected_rows.len());
-    }
-
-    fn build_scan_key(c1: &str, c2: i64) -> Bytes {
-        let mut buf = ByteVec::new();
-        let encoder = MemComparable;
-        encoder.encode(&mut buf, &Datum::from(c1)).unwrap();
-        encoder.encode(&mut buf, &Datum::from(c2)).unwrap();
-
-        Bytes::from(buf)
-    }
-
-    pub fn build_row_for_two_column(key1: &[u8], key2: i64) -> Row {
-        let datums = vec![
-            Datum::Varbinary(Bytes::copy_from_slice(key1)),
-            Datum::Timestamp(Timestamp::new(key2)),
-        ];
-
-        Row::from_datums(datums)
+        ]
     }
 }
diff --git a/analytic_engine/src/memtable/test_util.rs 
b/analytic_engine/src/memtable/test_util.rs
new file mode 100644
index 00000000..72364b18
--- /dev/null
+++ b/analytic_engine/src/memtable/test_util.rs
@@ -0,0 +1,42 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use common_types::row::Row;
+
+use crate::memtable::*;
+
+pub trait TestMemtableBuilder {
+    fn build(&self, data: &[(KeySequence, Row)]) -> MemTableRef;
+}
+
+pub struct TestUtil {
+    memtable: MemTableRef,
+    data: Vec<(KeySequence, Row)>,
+}
+
+impl TestUtil {
+    pub fn new<B: TestMemtableBuilder>(builder: B, data: Vec<(KeySequence, 
Row)>) -> Self {
+        let memtable = builder.build(&data);
+
+        Self { memtable, data }
+    }
+
+    pub fn memtable(&self) -> MemTableRef {
+        self.memtable.clone()
+    }
+
+    pub fn data(&self) -> Vec<Row> {
+        self.data.iter().map(|d| d.1.clone()).collect()
+    }
+}
diff --git a/analytic_engine/src/row_iter/record_batch_stream.rs 
b/analytic_engine/src/row_iter/record_batch_stream.rs
index dd0f4d13..15ecc8e3 100644
--- a/analytic_engine/src/row_iter/record_batch_stream.rs
+++ b/analytic_engine/src/row_iter/record_batch_stream.rs
@@ -256,6 +256,7 @@ pub fn stream_from_memtable(
         need_dedup: ctx.need_dedup,
         reverse: ctx.reverse,
         metrics_collector,
+        time_range: ctx.predicate.time_range(),
     };
 
     let iter = memtable.scan(scan_ctx, scan_req).context(ScanMemtable)?;
diff --git a/analytic_engine/src/sst/parquet/async_reader.rs 
b/analytic_engine/src/sst/parquet/async_reader.rs
index 0b7ffdc2..305bc937 100644
--- a/analytic_engine/src/sst/parquet/async_reader.rs
+++ b/analytic_engine/src/sst/parquet/async_reader.rs
@@ -560,10 +560,21 @@ impl Stream for RecordBatchProjector {
                         }
                         projector.metrics.row_num += record_batch.num_rows();
 
-                        let projected_batch =
-                            
FetchedRecordBatch::try_new(&projector.row_projector, record_batch)
-                                .box_err()
-                                .context(DecodeRecordBatch {});
+                        let fetched_schema = 
projector.row_projector.fetched_schema().clone();
+                        let primary_key_indexes = projector
+                            .row_projector
+                            .primary_key_indexes()
+                            .map(|idxs| idxs.to_vec());
+                        let fetching_column_indexes =
+                            
projector.row_projector.target_record_projection_remapping();
+                        let projected_batch = FetchedRecordBatch::try_new(
+                            fetched_schema,
+                            primary_key_indexes,
+                            fetching_column_indexes,
+                            record_batch,
+                        )
+                        .box_err()
+                        .context(DecodeRecordBatch {});
 
                         Poll::Ready(Some(projected_batch))
                     }
diff --git a/analytic_engine/src/table/data.rs 
b/analytic_engine/src/table/data.rs
index 998c5c3b..4d26448f 100644
--- a/analytic_engine/src/table/data.rs
+++ b/analytic_engine/src/table/data.rs
@@ -54,6 +54,7 @@ use crate::{
     memtable::{
         columnar::factory::ColumnarMemTableFactory,
         factory::{FactoryRef as MemTableFactoryRef, Options as 
MemTableOptions},
+        layered::factory::LayeredMemtableFactory,
         skiplist::factory::SkiplistMemTableFactory,
         MemtableType,
     },
@@ -70,9 +71,7 @@ use crate::{
 #[derive(Debug, Snafu)]
 pub enum Error {
     #[snafu(display("Failed to create memtable, err:{}", source))]
-    CreateMemTable {
-        source: crate::memtable::factory::Error,
-    },
+    CreateMemTable { source: crate::memtable::Error },
 
     #[snafu(display(
         "Failed to find or create memtable, timestamp overflow, 
timestamp:{:?}, duration:{:?}.\nBacktrace:\n{}",
@@ -232,6 +231,9 @@ pub struct TableData {
     /// Whether enable primary key sampling
     enable_primary_key_sampling: bool,
 
+    /// Whether enable layered memtable
+    pub enable_layered_memtable: bool,
+
     /// Metrics of this table
     pub metrics: Metrics,
 
@@ -315,7 +317,22 @@ impl TableData {
 
         let memtable_factory: MemTableFactoryRef = match opts.memtable_type {
             MemtableType::SkipList => Arc::new(SkiplistMemTableFactory),
-            MemtableType::Columnar => Arc::new(ColumnarMemTableFactory),
+            MemtableType::Column => Arc::new(ColumnarMemTableFactory),
+        };
+
+        // Wrap it by `LayeredMemtable`.
+        let mutable_segment_switch_threshold = opts
+            .layered_memtable_opts
+            .mutable_segment_switch_threshold
+            .0 as usize;
+        let enable_layered_memtable = mutable_segment_switch_threshold > 0;
+        let memtable_factory = if enable_layered_memtable {
+            Arc::new(LayeredMemtableFactory::new(
+                memtable_factory,
+                mutable_segment_switch_threshold,
+            ))
+        } else {
+            memtable_factory
         };
 
         let purge_queue = purger.create_purge_queue(space_id, id);
@@ -355,6 +372,7 @@ impl TableData {
             manifest_updates: AtomicUsize::new(0),
             manifest_snapshot_every_n_updates,
             enable_primary_key_sampling,
+            enable_layered_memtable,
         })
     }
 
@@ -376,7 +394,27 @@ impl TableData {
             metrics_opt,
             enable_primary_key_sampling,
         } = config;
-        let memtable_factory = Arc::new(SkiplistMemTableFactory);
+
+        let memtable_factory: MemTableFactoryRef = match 
add_meta.opts.memtable_type {
+            MemtableType::SkipList => Arc::new(SkiplistMemTableFactory),
+            MemtableType::Column => Arc::new(ColumnarMemTableFactory),
+        };
+        // Maybe wrap it by `LayeredMemtable`.
+        let mutable_segment_switch_threshold = add_meta
+            .opts
+            .layered_memtable_opts
+            .mutable_segment_switch_threshold
+            .0 as usize;
+        let enable_layered_memtable = mutable_segment_switch_threshold > 0;
+        let memtable_factory = if enable_layered_memtable {
+            Arc::new(LayeredMemtableFactory::new(
+                memtable_factory,
+                mutable_segment_switch_threshold,
+            )) as _
+        } else {
+            memtable_factory as _
+        };
+
         let purge_queue = purger.create_purge_queue(add_meta.space_id, 
add_meta.table_id);
         let current_version =
             TableVersion::new(mem_size_options.size_sampling_interval, 
purge_queue);
@@ -410,6 +448,7 @@ impl TableData {
             manifest_updates: AtomicUsize::new(0),
             manifest_snapshot_every_n_updates,
             enable_primary_key_sampling,
+            enable_layered_memtable,
         })
     }
 
diff --git a/analytic_engine/src/table_meta_set_impl.rs 
b/analytic_engine/src/table_meta_set_impl.rs
index 00ead80a..fa69642b 100644
--- a/analytic_engine/src/table_meta_set_impl.rs
+++ b/analytic_engine/src/table_meta_set_impl.rs
@@ -134,6 +134,7 @@ impl TableMetaSetImpl {
                     collector: space.mem_usage_collector.clone(),
                     size_sampling_interval: space.mem_usage_sampling_interval,
                 };
+
                 let table_data = Arc::new(
                     TableData::new(
                         TableDesc {
diff --git a/analytic_engine/src/table_options.rs 
b/analytic_engine/src/table_options.rs
index 35b0afc6..c33a75db 100644
--- a/analytic_engine/src/table_options.rs
+++ b/analytic_engine/src/table_options.rs
@@ -33,7 +33,7 @@ use crate::{
     compaction::{
         self, CompactionStrategy, SizeTieredCompactionOptions, 
TimeWindowCompactionOptions,
     },
-    memtable::MemtableType,
+    memtable::{LayeredMemtableOptions, MemtableType},
 };
 
 const UPDATE_MODE_OVERWRITE: &str = "OVERWRITE";
@@ -98,6 +98,7 @@ pub enum Error {
         backtrace
     ))]
     ParseUpdateMode { s: String, backtrace: Backtrace },
+
     #[snafu(display(
         "Failed to parse compression, name:{}.\nBacktrace:\n{}",
         name,
@@ -134,6 +135,17 @@ pub enum Error {
         backtrace
     ))]
     HybridDeprecated { backtrace: Backtrace },
+
+    #[snafu(display(
+        "Failed to parse layered memtable options, 
err:{source}.\nBacktrace:\n{backtrace}",
+    ))]
+    ParseLayeredMemtableOptions {
+        source: crate::memtable::Error,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Layered memtable options is 
missing.\nBacktrace:\n{backtrace}",))]
+    MissingLayeredMemtableOptions { backtrace: Backtrace },
 }
 
 define_result!(Error);
@@ -396,8 +408,11 @@ pub struct TableOptions {
     pub num_rows_per_row_group: usize,
     /// Table Compression
     pub compression: Compression,
+
     /// Memtable type
     pub memtable_type: MemtableType,
+    /// Layered memtable options
+    pub layered_memtable_opts: LayeredMemtableOptions,
 }
 
 impl TableOptions {
@@ -583,6 +598,8 @@ impl From<TableOptions> for manifest_pb::TableOptions {
             ),
         };
 
+        let layered_memtable_opts = opts.layered_memtable_opts.into();
+
         manifest_pb::TableOptions {
             segment_duration,
             enable_ttl: opts.enable_ttl,
@@ -598,6 +615,7 @@ impl From<TableOptions> for manifest_pb::TableOptions {
             storage_format_hint: Some(manifest_pb::StorageFormatHint::from(
                 opts.storage_format_hint,
             )),
+            layered_memtable_options: Some(layered_memtable_opts),
             // TODO: persist `memtable_type` in PB.
         }
     }
@@ -659,6 +677,11 @@ impl TryFrom<manifest_pb::TableOptions> for TableOptions {
         };
 
         let storage_format_hint = 
opts.storage_format_hint.context(MissingStorageFormatHint)?;
+        let layered_memtable_opts = opts
+            .layered_memtable_options
+            .context(MissingLayeredMemtableOptions)?
+            .into();
+
         let table_opts = Self {
             segment_duration,
             enable_ttl: opts.enable_ttl,
@@ -671,6 +694,7 @@ impl TryFrom<manifest_pb::TableOptions> for TableOptions {
             compression: Compression::from(compression),
             storage_format_hint: 
StorageFormatHint::try_from(storage_format_hint)?,
             memtable_type: MemtableType::SkipList,
+            layered_memtable_opts,
         };
 
         Ok(table_opts)
@@ -691,6 +715,7 @@ impl Default for TableOptions {
             compression: Compression::Zstd,
             storage_format_hint: StorageFormatHint::default(),
             memtable_type: MemtableType::SkipList,
+            layered_memtable_opts: LayeredMemtableOptions::default(),
         }
     }
 }
@@ -712,54 +737,59 @@ pub fn merge_table_options_for_alter(
 /// The options will override the old options.
 fn merge_table_options(
     options: &HashMap<String, String>,
-    table_old_opts: &TableOptions,
+    base_table_opts: &TableOptions,
     is_create: bool,
 ) -> Result<TableOptions> {
-    let mut table_opts = table_old_opts.clone();
+    let mut base_table_opts = base_table_opts.clone();
     if is_create {
         if let Some(v) = options.get(SEGMENT_DURATION) {
             if v.is_empty() {
-                table_opts.segment_duration = None;
+                base_table_opts.segment_duration = None;
             } else {
-                table_opts.segment_duration = 
Some(parse_duration(v).context(ParseDuration)?);
+                base_table_opts.segment_duration = 
Some(parse_duration(v).context(ParseDuration)?);
             }
         }
         if let Some(v) = options.get(UPDATE_MODE) {
-            table_opts.update_mode = UpdateMode::parse_from(v)?;
+            base_table_opts.update_mode = UpdateMode::parse_from(v)?;
         }
     }
 
     if let Some(v) = options.get(TTL) {
-        table_opts.ttl = parse_duration(v).context(ParseDuration)?;
+        base_table_opts.ttl = parse_duration(v).context(ParseDuration)?;
     }
     if let Some(v) = options.get(OPTION_KEY_ENABLE_TTL) {
-        table_opts.enable_ttl = v.parse::<bool>().context(ParseBool)?;
+        base_table_opts.enable_ttl = v.parse::<bool>().context(ParseBool)?;
     }
     if let Some(v) = options.get(ARENA_BLOCK_SIZE) {
         let size = parse_size(v)?;
-        table_opts.arena_block_size = size.0 as u32;
+        base_table_opts.arena_block_size = size.0 as u32;
     }
     if let Some(v) = options.get(WRITE_BUFFER_SIZE) {
         let size = parse_size(v)?;
-        table_opts.write_buffer_size = size.0 as u32;
+        base_table_opts.write_buffer_size = size.0 as u32;
     }
     if let Some(v) = options.get(COMPACTION_STRATEGY) {
-        table_opts.compaction_strategy =
+        base_table_opts.compaction_strategy =
             CompactionStrategy::parse_from(v, options).context(ParseStrategy { 
value: v })?;
     }
     if let Some(v) = options.get(NUM_ROWS_PER_ROW_GROUP) {
-        table_opts.num_rows_per_row_group = v.parse().context(ParseInt)?;
+        base_table_opts.num_rows_per_row_group = v.parse().context(ParseInt)?;
     }
     if let Some(v) = options.get(COMPRESSION) {
-        table_opts.compression = Compression::parse_from(v)?;
+        base_table_opts.compression = Compression::parse_from(v)?;
     }
     if let Some(v) = options.get(STORAGE_FORMAT) {
-        table_opts.storage_format_hint = v.as_str().try_into()?;
+        base_table_opts.storage_format_hint = v.as_str().try_into()?;
     }
     if let Some(v) = options.get(MEMTABLE_TYPE) {
-        table_opts.memtable_type = MemtableType::parse_from(v);
+        base_table_opts.memtable_type = MemtableType::parse_from(v);
     }
-    Ok(table_opts)
+
+    let layered_memtable_opts =
+        
LayeredMemtableOptions::parse_from(options).context(ParseLayeredMemtableOptions)?;
+    base_table_opts.layered_memtable_opts = layered_memtable_opts;
+
+    Ok(base_table_opts)
 }
 
 fn parse_size(v: &str) -> Result<ReadableSize> {
diff --git a/benchmarks/src/scan_memtable_bench.rs 
b/benchmarks/src/scan_memtable_bench.rs
index 72e09a05..79c9378c 100644
--- a/benchmarks/src/scan_memtable_bench.rs
+++ b/benchmarks/src/scan_memtable_bench.rs
@@ -25,7 +25,10 @@ use analytic_engine::{
     sst::meta_data::cache::MetaCacheRef,
 };
 use arena::NoopCollector;
-use common_types::projected_schema::{ProjectedSchema, RowProjectorBuilder};
+use common_types::{
+    projected_schema::{ProjectedSchema, RowProjectorBuilder},
+    time::TimeRange,
+};
 use logger::info;
 use object_store::{LocalFileSystem, Path};
 
@@ -103,6 +106,7 @@ impl ScanMemTableBench {
             reverse: false,
             metrics_collector: None,
             row_projector_builder,
+            time_range: TimeRange::min_to_max(),
         };
 
         let iter = self.memtable.scan(scan_ctx, scan_req).unwrap();
diff --git a/common_types/src/lib.rs b/common_types/src/lib.rs
index 78995e12..f54e92ae 100644
--- a/common_types/src/lib.rs
+++ b/common_types/src/lib.rs
@@ -50,6 +50,7 @@ pub const UPDATE_MODE: &str = "update_mode";
 pub const COMPRESSION: &str = "compression";
 pub const STORAGE_FORMAT: &str = "storage_format";
 pub const MEMTABLE_TYPE: &str = "memtable_type";
+pub const MUTABLE_SEGMENT_SWITCH_THRESHOLD: &str = 
"mutable_segment_switch_threshold";
 
 #[cfg(any(test, feature = "test"))]
 pub mod tests;
diff --git a/common_types/src/projected_schema.rs 
b/common_types/src/projected_schema.rs
index d0f780d8..1ef458e9 100644
--- a/common_types/src/projected_schema.rs
+++ b/common_types/src/projected_schema.rs
@@ -224,7 +224,7 @@ impl RowProjector {
 
     /// The projected indexes of all columns(existed and not exist) in the
     /// projected source schema.
-    pub fn fetched_projected_source_column_indexes(&self) -> &[Option<usize>] {
+    pub fn target_record_projection_remapping(&self) -> &[Option<usize>] {
         &self.target_record_projection_remapping
     }
 
diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs
index 1b7d610d..d4a3b24a 100644
--- a/common_types/src/record_batch.rs
+++ b/common_types/src/record_batch.rs
@@ -29,7 +29,7 @@ use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
 use crate::{
     column_block::{cast_nanosecond_to_mills, ColumnBlock, ColumnBlockBuilder},
     datum::DatumKind,
-    projected_schema::{ProjectedSchema, RowProjector},
+    projected_schema::ProjectedSchema,
     row::{
         contiguous::{ContiguousRow, ProjectedContiguousRow},
         Row, RowViewOnBatch,
@@ -279,6 +279,11 @@ impl RecordBatch {
     pub fn into_arrow_record_batch(self) -> ArrowRecordBatch {
         self.data.arrow_record_batch
     }
+
+    #[inline]
+    pub fn into_record_batch_data(self) -> RecordBatchData {
+        self.data
+    }
 }
 
 impl TryFrom<ArrowRecordBatch> for RecordBatch {
@@ -371,14 +376,16 @@ pub struct FetchedRecordBatch {
 }
 
 impl FetchedRecordBatch {
-    pub fn try_new(ctx: &RowProjector, arrow_record_batch: ArrowRecordBatch) 
-> Result<Self> {
-        let column_indexes = ctx.fetched_projected_source_column_indexes();
-        let schema = ctx.fetched_schema().clone();
-        let mut column_blocks = Vec::with_capacity(schema.num_columns());
-
+    pub fn try_new(
+        fetched_schema: RecordSchema,
+        primary_key_indexes: Option<Vec<usize>>,
+        column_indexes: &[Option<usize>],
+        arrow_record_batch: ArrowRecordBatch,
+    ) -> Result<Self> {
+        let mut column_blocks = 
Vec::with_capacity(fetched_schema.num_columns());
         let num_rows = arrow_record_batch.num_rows();
         let num_columns = arrow_record_batch.num_columns();
-        for (col_idx_opt, col_schema) in 
column_indexes.iter().zip(schema.columns()) {
+        for (col_idx_opt, col_schema) in 
column_indexes.iter().zip(fetched_schema.columns()) {
             match col_idx_opt {
                 Some(col_idx) => {
                     ensure!(
@@ -409,11 +416,11 @@ impl FetchedRecordBatch {
             }
         }
 
-        let data = RecordBatchData::new(schema.to_arrow_schema_ref(), 
column_blocks)?;
+        let data = RecordBatchData::new(fetched_schema.to_arrow_schema_ref(), 
column_blocks)?;
 
         Ok(FetchedRecordBatch {
-            schema,
-            primary_key_indexes: ctx.primary_key_indexes().map(|idxs| 
idxs.to_vec()),
+            schema: fetched_schema,
+            primary_key_indexes,
             data,
         })
     }
diff --git a/common_types/src/time.rs b/common_types/src/time.rs
index 342a5509..14266d67 100644
--- a/common_types/src/time.rs
+++ b/common_types/src/time.rs
@@ -300,6 +300,13 @@ impl TimeRange {
             self.exclusive_end.min(other.exclusive_end),
         )
     }
+
+    pub fn merge_range(&self, other: TimeRange) -> TimeRange {
+        TimeRange {
+            inclusive_start: self.inclusive_start.min(other.inclusive_start),
+            exclusive_end: self.exclusive_end.max(other.exclusive_end),
+        }
+    }
 }
 
 impl From<TimeRange> for time_range::TimeRange {
diff --git a/system_catalog/src/sys_catalog_table.rs 
b/system_catalog/src/sys_catalog_table.rs
index 00f7b061..0eb8c523 100644
--- a/system_catalog/src/sys_catalog_table.rs
+++ b/system_catalog/src/sys_catalog_table.rs
@@ -309,6 +309,11 @@ impl SysCatalogTable {
             common_types::OPTION_KEY_ENABLE_TTL.to_string(),
             DEFAULT_ENABLE_TTL.to_string(),
         );
+        // Disable layered memtable for system catalog table.
+        options.insert(
+            common_types::MUTABLE_SEGMENT_SWITCH_THRESHOLD.to_string(),
+            0.to_string(),
+        );
         let params = CreateTableParams {
             catalog_name: consts::SYSTEM_CATALOG.to_string(),
             schema_name: consts::SYSTEM_CATALOG_SCHEMA.to_string(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to