ShiKaiWi commented on code in PR #1271:
URL: 
https://github.com/apache/incubator-horaedb/pull/1271#discussion_r1444123488


##########
analytic_engine/src/memtable/layered/factory.rs:
##########
@@ -0,0 +1,50 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Skiplist memtable factory
+
+use std::sync::Arc;
+
+use crate::memtable::{
+    factory::{Factory, FactoryRef, Options},
+    layered::LayeredMemTable,
+    MemTableRef, Result,
+};

Review Comment:
   Add newline just below this line.



##########
analytic_engine/src/memtable/layered/mod.rs:
##########
@@ -0,0 +1,730 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! MemTable based on skiplist
+
+pub mod factory;
+pub mod iter;
+
+use std::{
+    mem,
+    ops::{Bound, Deref},
+    sync::{
+        atomic::{self, AtomicU64},
+        RwLock,
+    },
+};
+
+use arena::CollectorRef;
+use arrow::record_batch::RecordBatch as ArrowRecordBatch;
+use bytes_ext::Bytes;
+use common_types::{
+    projected_schema::RowProjectorBuilder, row::Row, schema::Schema, 
time::TimeRange,
+    SequenceNumber,
+};
+use generic_error::BoxError;
+use logger::debug;
+use skiplist::{BytewiseComparator, KeyComparator};
+use snafu::{OptionExt, ResultExt};
+
+use crate::memtable::{
+    factory::{FactoryRef, Options},
+    key::KeySequence,
+    layered::iter::ColumnarIterImpl,
+    ColumnarIterPtr, Internal, InternalNoCause, MemTable, MemTableRef, Metrics 
as MemtableMetrics,
+    PutContext, Result, ScanContext, ScanRequest,
+};
+
+/// MemTable implementation based on skiplist
+pub(crate) struct LayeredMemTable {
+    /// Schema of this memtable, is immutable.
+    schema: Schema,
+
+    /// The last sequence of the rows in this memtable. Update to this field
+    /// require external synchronization.
+    last_sequence: AtomicU64,
+
+    inner: RwLock<Inner>,
+
+    mutable_switch_threshold: usize,
+}
+
+impl LayeredMemTable {
+    pub fn new(
+        opts: &Options,
+        inner_memtable_factory: FactoryRef,
+        mutable_switch_threshold: usize,
+    ) -> Result<Self> {
+        let inner = Inner::new(inner_memtable_factory, opts)?;
+
+        Ok(Self {
+            schema: opts.schema.clone(),
+            last_sequence: AtomicU64::new(opts.creation_sequence),
+            inner: RwLock::new(inner),
+            mutable_switch_threshold,
+        })
+    }
+
+    // Used for testing only
+    #[cfg(test)]
+    fn force_switch_mutable_segment(&self) -> Result<()> {
+        let inner = &mut *self.inner.write().unwrap();
+        inner.switch_mutable_segment(self.schema.clone())
+    }
+}
+
+impl MemTable for LayeredMemTable {
+    fn schema(&self) -> &Schema {
+        &self.schema
+    }
+
+    fn min_key(&self) -> Option<Bytes> {
+        self.inner.read().unwrap().min_key()
+    }
+
+    fn max_key(&self) -> Option<Bytes> {
+        self.inner.read().unwrap().max_key()
+    }
+
+    fn put(
+        &self,
+        ctx: &mut PutContext,
+        sequence: KeySequence,
+        row: &Row,
+        schema: &Schema,
+    ) -> Result<()> {
+        let memory_usage = {
+            let inner = self.inner.read().unwrap();
+            inner.put(ctx, sequence, row, schema)?;
+            inner.mutable_segment.0.approximate_memory_usage()
+        };
+
+        if memory_usage > self.mutable_switch_threshold {
+            debug!(
+                "LayeredMemTable put, memory_usage:{memory_usage}, 
mutable_switch_threshold:{}",
+                self.mutable_switch_threshold
+            );
+            let inner = &mut *self.inner.write().unwrap();
+            inner.switch_mutable_segment(self.schema.clone())?;
+        }
+
+        Ok(())
+    }
+
+    fn scan(&self, ctx: ScanContext, request: ScanRequest) -> 
Result<ColumnarIterPtr> {
+        debug!("LayeredMemTable scan");

Review Comment:
   Remove this debugging log?



##########
analytic_engine/src/memtable/layered/mod.rs:
##########
@@ -0,0 +1,730 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! MemTable based on skiplist
+
+pub mod factory;
+pub mod iter;
+
+use std::{
+    mem,
+    ops::{Bound, Deref},
+    sync::{
+        atomic::{self, AtomicU64},
+        RwLock,
+    },
+};
+
+use arena::CollectorRef;
+use arrow::record_batch::RecordBatch as ArrowRecordBatch;
+use bytes_ext::Bytes;
+use common_types::{
+    projected_schema::RowProjectorBuilder, row::Row, schema::Schema, 
time::TimeRange,
+    SequenceNumber,
+};
+use generic_error::BoxError;
+use logger::debug;
+use skiplist::{BytewiseComparator, KeyComparator};
+use snafu::{OptionExt, ResultExt};
+
+use crate::memtable::{
+    factory::{FactoryRef, Options},
+    key::KeySequence,
+    layered::iter::ColumnarIterImpl,
+    ColumnarIterPtr, Internal, InternalNoCause, MemTable, MemTableRef, Metrics 
as MemtableMetrics,
+    PutContext, Result, ScanContext, ScanRequest,
+};
+
+/// MemTable implementation based on skiplist
+pub(crate) struct LayeredMemTable {
+    /// Schema of this memtable, is immutable.
+    schema: Schema,
+
+    /// The last sequence of the rows in this memtable. Update to this field
+    /// require external synchronization.
+    last_sequence: AtomicU64,
+
+    inner: RwLock<Inner>,
+
+    mutable_switch_threshold: usize,
+}
+
+impl LayeredMemTable {
+    pub fn new(
+        opts: &Options,
+        inner_memtable_factory: FactoryRef,
+        mutable_switch_threshold: usize,
+    ) -> Result<Self> {
+        let inner = Inner::new(inner_memtable_factory, opts)?;
+
+        Ok(Self {
+            schema: opts.schema.clone(),
+            last_sequence: AtomicU64::new(opts.creation_sequence),
+            inner: RwLock::new(inner),
+            mutable_switch_threshold,
+        })
+    }
+
+    // Used for testing only
+    #[cfg(test)]
+    fn force_switch_mutable_segment(&self) -> Result<()> {
+        let inner = &mut *self.inner.write().unwrap();
+        inner.switch_mutable_segment(self.schema.clone())
+    }
+}
+
+impl MemTable for LayeredMemTable {
+    fn schema(&self) -> &Schema {
+        &self.schema
+    }
+
+    fn min_key(&self) -> Option<Bytes> {
+        self.inner.read().unwrap().min_key()
+    }
+
+    fn max_key(&self) -> Option<Bytes> {
+        self.inner.read().unwrap().max_key()
+    }
+
+    fn put(
+        &self,
+        ctx: &mut PutContext,
+        sequence: KeySequence,
+        row: &Row,
+        schema: &Schema,
+    ) -> Result<()> {
+        let memory_usage = {
+            let inner = self.inner.read().unwrap();
+            inner.put(ctx, sequence, row, schema)?;
+            inner.mutable_segment.0.approximate_memory_usage()
+        };
+
+        if memory_usage > self.mutable_switch_threshold {
+            debug!(
+                "LayeredMemTable put, memory_usage:{memory_usage}, 
mutable_switch_threshold:{}",
+                self.mutable_switch_threshold
+            );
+            let inner = &mut *self.inner.write().unwrap();
+            inner.switch_mutable_segment(self.schema.clone())?;
+        }
+
+        Ok(())
+    }
+
+    fn scan(&self, ctx: ScanContext, request: ScanRequest) -> 
Result<ColumnarIterPtr> {
+        debug!("LayeredMemTable scan");
+        let inner = self.inner.read().unwrap();
+        inner.scan(&self.schema, ctx, request)
+    }
+
+    fn approximate_memory_usage(&self) -> usize {
+        self.inner.read().unwrap().approximate_memory_usage()
+    }
+
+    fn set_last_sequence(&self, sequence: SequenceNumber) -> Result<()> {
+        self.last_sequence
+            .store(sequence, atomic::Ordering::Relaxed);
+        Ok(())
+    }
+
+    fn last_sequence(&self) -> SequenceNumber {
+        self.last_sequence.load(atomic::Ordering::Relaxed)
+    }
+
+    fn time_range(&self) -> Option<TimeRange> {
+        let inner = self.inner.read().unwrap();
+        inner.time_range()
+    }
+
+    fn metrics(&self) -> MemtableMetrics {
+        // FIXME: stats and return metrics
+        MemtableMetrics::default()
+    }
+}
+
+/// Layered memtable inner
+struct Inner {
+    mutable_segment_builder: MutableSegmentBuilder,
+    mutable_segment: MutableSegment,
+    immutable_segments: Vec<ImmutableSegment>,
+}
+
+impl Inner {
+    fn new(memtable_factory: FactoryRef, opts: &Options) -> Result<Self> {
+        let builder_opts = MutableBuilderOptions {
+            schema: opts.schema.clone(),
+            arena_block_size: opts.arena_block_size,
+            collector: opts.collector.clone(),
+        };
+        let mutable_segment_builder = 
MutableSegmentBuilder::new(memtable_factory, builder_opts);
+
+        // Build the first mutable batch.
+        let init_mutable_segment = mutable_segment_builder.build()?;
+
+        Ok(Self {
+            mutable_segment_builder,
+            mutable_segment: init_mutable_segment,
+            immutable_segments: vec![],
+        })
+    }
+
+    /// Scan batches including `mutable` and `immutable`s.
+    #[inline]
+    fn scan(
+        &self,
+        schema: &Schema,
+        ctx: ScanContext,
+        request: ScanRequest,
+    ) -> Result<ColumnarIterPtr> {
+        let iter = ColumnarIterImpl::new(
+            schema,
+            ctx,
+            request,
+            &self.mutable_segment,
+            &self.immutable_segments,
+        )?;
+        Ok(Box::new(iter))
+    }
+
+    #[inline]
+    fn put(
+        &self,
+        ctx: &mut PutContext,
+        sequence: KeySequence,
+        row: &Row,
+        schema: &Schema,
+    ) -> Result<()> {
+        self.mutable_segment.put(ctx, sequence, row, schema)
+    }
+
+    fn switch_mutable_segment(&mut self, schema: Schema) -> Result<()> {
+        let imm_num = self.immutable_segments.len();
+        debug!("LayeredMemTable switch_mutable_segment, imm_num:{imm_num}");
+
+        // Build a new mutable segment, and replace current's.
+        let new_mutable = self.mutable_segment_builder.build()?;
+        let current_mutable = mem::replace(&mut self.mutable_segment, 
new_mutable);
+        let fetched_schema = schema.to_record_schema();
+
+        // Convert current's to immutable.
+        let scan_ctx = ScanContext::default();
+        let row_projector_builder = RowProjectorBuilder::new(fetched_schema, 
schema, None);
+        let scan_req = ScanRequest {
+            start_user_key: Bound::Unbounded,
+            end_user_key: Bound::Unbounded,
+            sequence: common_types::MAX_SEQUENCE_NUMBER,
+            need_dedup: false,
+            reverse: false,
+            metrics_collector: None,
+            time_range: TimeRange::min_to_max(),
+            row_projector_builder,
+        };
+
+        let immutable_batches = current_mutable
+            .scan(scan_ctx, scan_req)?
+            .map(|batch_res| batch_res.map(|batch| 
batch.into_arrow_record_batch()))
+            .collect::<Result<Vec<_>>>()?;
+
+        let time_range = current_mutable.time_range().context(InternalNoCause {
+            msg: "failed to get time range from mutable segment",
+        })?;
+        let max_key = current_mutable.max_key().context(InternalNoCause {
+            msg: "failed to get time range from mutable segment",
+        })?;
+        let min_key = current_mutable.min_key().context(InternalNoCause {
+            msg: "failed to get time range from mutable segment",

Review Comment:
   ```suggestion
               msg: "failed to get min key from mutable segment",
   ```



##########
analytic_engine/src/memtable/test_util.rs:
##########
@@ -0,0 +1,42 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use common_types::row::Row;
+
+use super::*;

Review Comment:
   Use the absolute path?



##########
analytic_engine/src/memtable/layered/iter.rs:
##########
@@ -0,0 +1,120 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Skiplist memtable iterator
+
+use common_types::{record_batch::FetchedRecordBatch, schema::Schema, 
time::TimeRange};
+use generic_error::BoxError;
+use snafu::ResultExt;
+
+use crate::memtable::{
+    layered::{ImmutableSegment, MutableSegment},
+    ColumnarIterPtr, Internal, ProjectSchema, Result, ScanContext, ScanRequest,
+};
+
+/// Columnar iterator for [LayeredMemTable]
+pub(crate) struct ColumnarIterImpl {
+    selected_batch_iter: ColumnarIterPtr,
+}
+
+impl ColumnarIterImpl {
+    pub fn new(
+        memtable_schema: &Schema,
+        ctx: ScanContext,
+        request: ScanRequest,
+        mutable: &MutableSegment,
+        immutables: &[ImmutableSegment],
+    ) -> Result<Self> {
+        // Create projection for the memtable schema
+        let row_projector = request
+            .row_projector_builder
+            .build(memtable_schema)
+            .context(ProjectSchema)?;
+
+        let (maybe_mutable, selected_immutables) =
+            Self::filter_by_time_range(mutable, immutables, 
request.time_range);
+
+        let immutable_batches = selected_immutables
+            .flat_map(|imm| {
+                imm.record_batches().iter().map(|batch| {
+                    // TODO: reduce clone here.
+                    let fetched_schema = 
row_projector.fetched_schema().clone();
+                    let primary_key_indexes = row_projector
+                        .primary_key_indexes()
+                        .map(|idxs| idxs.to_vec());

Review Comment:
   Shall we avoid the two clones here?



##########
analytic_engine/src/lib.rs:
##########
@@ -83,6 +83,11 @@ pub struct Config {
     /// The ratio of table's write buffer size to trigger preflush, and it
     /// should be in the range (0, 1].
     pub preflush_write_buffer_size_ratio: f32,
+
+    /// Mutable segment switch threshold for memtable

Review Comment:
   ```suggestion
       /// The threshold to trigger switching mutable segment of memtable.
   ```



##########
analytic_engine/src/sst/parquet/async_reader.rs:
##########
@@ -540,10 +540,21 @@ impl Stream for RecordBatchProjector {
                         }
                         projector.metrics.row_num += record_batch.num_rows();
 
-                        let projected_batch =
-                            
FetchedRecordBatch::try_new(&projector.row_projector, record_batch)
-                                .box_err()
-                                .context(DecodeRecordBatch {});
+                        let fetched_schema = 
projector.row_projector.fetched_schema().clone();
+                        let primary_key_indexes = projector
+                            .row_projector
+                            .primary_key_indexes()
+                            .map(|idxs| idxs.to_vec());

Review Comment:
   There is no clone before, why do clone after this change set?



##########
analytic_engine/src/table/data.rs:
##########
@@ -315,7 +317,23 @@ impl TableData {
 
         let memtable_factory: MemTableFactoryRef = match opts.memtable_type {
             MemtableType::SkipList => Arc::new(SkiplistMemTableFactory),
-            MemtableType::Columnar => Arc::new(ColumnarMemTableFactory),
+            MemtableType::Column => Arc::new(ColumnarMemTableFactory),
+        };
+
+        // Wrap it by `LayeredMemtable`.
+        let mutable_segment_switch_threshold = opts
+            .layered_memtable_opts
+            .mutable_segment_switch_threshold
+            .0 as usize;
+        let enable_layered_memtable = mutable_segment_switch_threshold > 0;
+        let memtable_factory = if enable_layered_memtable {
+            debug!("Enable layered memtable, mutable_segment_type:{}, 
mutable_segment_switch_threshold:{mutable_segment_switch_threshold}", 
opts.memtable_type.to_string());

Review Comment:
   Is this debugging log necessary? How about removing it? (At least avoid the 
`to_string` clone.)



##########
analytic_engine/src/memtable/layered/mod.rs:
##########
@@ -0,0 +1,730 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! MemTable based on skiplist
+
+pub mod factory;
+pub mod iter;
+
+use std::{
+    mem,
+    ops::{Bound, Deref},
+    sync::{
+        atomic::{self, AtomicU64},
+        RwLock,
+    },
+};
+
+use arena::CollectorRef;
+use arrow::record_batch::RecordBatch as ArrowRecordBatch;
+use bytes_ext::Bytes;
+use common_types::{
+    projected_schema::RowProjectorBuilder, row::Row, schema::Schema, 
time::TimeRange,
+    SequenceNumber,
+};
+use generic_error::BoxError;
+use logger::debug;
+use skiplist::{BytewiseComparator, KeyComparator};
+use snafu::{OptionExt, ResultExt};
+
+use crate::memtable::{
+    factory::{FactoryRef, Options},
+    key::KeySequence,
+    layered::iter::ColumnarIterImpl,
+    ColumnarIterPtr, Internal, InternalNoCause, MemTable, MemTableRef, Metrics 
as MemtableMetrics,
+    PutContext, Result, ScanContext, ScanRequest,
+};
+
+/// MemTable implementation based on skiplist
+pub(crate) struct LayeredMemTable {
+    /// Schema of this memtable, is immutable.
+    schema: Schema,
+
+    /// The last sequence of the rows in this memtable. Update to this field
+    /// require external synchronization.
+    last_sequence: AtomicU64,
+
+    inner: RwLock<Inner>,
+
+    mutable_switch_threshold: usize,
+}
+
+impl LayeredMemTable {
+    pub fn new(
+        opts: &Options,
+        inner_memtable_factory: FactoryRef,
+        mutable_switch_threshold: usize,
+    ) -> Result<Self> {
+        let inner = Inner::new(inner_memtable_factory, opts)?;
+
+        Ok(Self {
+            schema: opts.schema.clone(),
+            last_sequence: AtomicU64::new(opts.creation_sequence),
+            inner: RwLock::new(inner),
+            mutable_switch_threshold,
+        })
+    }
+
+    // Used for testing only
+    #[cfg(test)]
+    fn force_switch_mutable_segment(&self) -> Result<()> {
+        let inner = &mut *self.inner.write().unwrap();
+        inner.switch_mutable_segment(self.schema.clone())
+    }
+}
+
+impl MemTable for LayeredMemTable {
+    fn schema(&self) -> &Schema {
+        &self.schema
+    }
+
+    fn min_key(&self) -> Option<Bytes> {
+        self.inner.read().unwrap().min_key()
+    }
+
+    fn max_key(&self) -> Option<Bytes> {
+        self.inner.read().unwrap().max_key()
+    }
+
+    fn put(
+        &self,
+        ctx: &mut PutContext,
+        sequence: KeySequence,
+        row: &Row,
+        schema: &Schema,
+    ) -> Result<()> {
+        let memory_usage = {
+            let inner = self.inner.read().unwrap();
+            inner.put(ctx, sequence, row, schema)?;
+            inner.mutable_segment.0.approximate_memory_usage()
+        };
+
+        if memory_usage > self.mutable_switch_threshold {
+            debug!(
+                "LayeredMemTable put, memory_usage:{memory_usage}, 
mutable_switch_threshold:{}",
+                self.mutable_switch_threshold
+            );
+            let inner = &mut *self.inner.write().unwrap();
+            inner.switch_mutable_segment(self.schema.clone())?;
+        }
+
+        Ok(())
+    }
+
+    fn scan(&self, ctx: ScanContext, request: ScanRequest) -> 
Result<ColumnarIterPtr> {
+        debug!("LayeredMemTable scan");
+        let inner = self.inner.read().unwrap();
+        inner.scan(&self.schema, ctx, request)
+    }
+
+    fn approximate_memory_usage(&self) -> usize {
+        self.inner.read().unwrap().approximate_memory_usage()
+    }
+
+    fn set_last_sequence(&self, sequence: SequenceNumber) -> Result<()> {
+        self.last_sequence
+            .store(sequence, atomic::Ordering::Relaxed);
+        Ok(())
+    }
+
+    fn last_sequence(&self) -> SequenceNumber {
+        self.last_sequence.load(atomic::Ordering::Relaxed)
+    }
+
+    fn time_range(&self) -> Option<TimeRange> {
+        let inner = self.inner.read().unwrap();
+        inner.time_range()
+    }
+
+    fn metrics(&self) -> MemtableMetrics {
+        // FIXME: stats and return metrics
+        MemtableMetrics::default()
+    }
+}
+
+/// Layered memtable inner
+struct Inner {
+    mutable_segment_builder: MutableSegmentBuilder,
+    mutable_segment: MutableSegment,
+    immutable_segments: Vec<ImmutableSegment>,
+}
+
+impl Inner {
+    fn new(memtable_factory: FactoryRef, opts: &Options) -> Result<Self> {
+        let builder_opts = MutableBuilderOptions {
+            schema: opts.schema.clone(),
+            arena_block_size: opts.arena_block_size,
+            collector: opts.collector.clone(),
+        };
+        let mutable_segment_builder = 
MutableSegmentBuilder::new(memtable_factory, builder_opts);
+
+        // Build the first mutable batch.
+        let init_mutable_segment = mutable_segment_builder.build()?;
+
+        Ok(Self {
+            mutable_segment_builder,
+            mutable_segment: init_mutable_segment,
+            immutable_segments: vec![],
+        })
+    }
+
+    /// Scan batches including `mutable` and `immutable`s.
+    #[inline]
+    fn scan(
+        &self,
+        schema: &Schema,
+        ctx: ScanContext,
+        request: ScanRequest,
+    ) -> Result<ColumnarIterPtr> {
+        let iter = ColumnarIterImpl::new(
+            schema,
+            ctx,
+            request,
+            &self.mutable_segment,
+            &self.immutable_segments,
+        )?;
+        Ok(Box::new(iter))
+    }
+
+    #[inline]
+    fn put(
+        &self,
+        ctx: &mut PutContext,
+        sequence: KeySequence,
+        row: &Row,
+        schema: &Schema,
+    ) -> Result<()> {
+        self.mutable_segment.put(ctx, sequence, row, schema)
+    }
+
+    fn switch_mutable_segment(&mut self, schema: Schema) -> Result<()> {
+        let imm_num = self.immutable_segments.len();
+        debug!("LayeredMemTable switch_mutable_segment, imm_num:{imm_num}");
+
+        // Build a new mutable segment, and replace current's.
+        let new_mutable = self.mutable_segment_builder.build()?;
+        let current_mutable = mem::replace(&mut self.mutable_segment, 
new_mutable);
+        let fetched_schema = schema.to_record_schema();
+
+        // Convert current's to immutable.
+        let scan_ctx = ScanContext::default();
+        let row_projector_builder = RowProjectorBuilder::new(fetched_schema, 
schema, None);
+        let scan_req = ScanRequest {
+            start_user_key: Bound::Unbounded,
+            end_user_key: Bound::Unbounded,
+            sequence: common_types::MAX_SEQUENCE_NUMBER,
+            need_dedup: false,
+            reverse: false,
+            metrics_collector: None,
+            time_range: TimeRange::min_to_max(),
+            row_projector_builder,
+        };
+
+        let immutable_batches = current_mutable
+            .scan(scan_ctx, scan_req)?
+            .map(|batch_res| batch_res.map(|batch| 
batch.into_arrow_record_batch()))
+            .collect::<Result<Vec<_>>>()?;
+
+        let time_range = current_mutable.time_range().context(InternalNoCause {
+            msg: "failed to get time range from mutable segment",
+        })?;
+        let max_key = current_mutable.max_key().context(InternalNoCause {
+            msg: "failed to get time range from mutable segment",

Review Comment:
   ```suggestion
               msg: "failed to get max key from mutable segment",
   ```



##########
analytic_engine/src/memtable/mod.rs:
##########
@@ -63,7 +67,54 @@ impl ToString for MemtableType {
     fn to_string(&self) -> String {
         match self {
             MemtableType::SkipList => MEMTABLE_TYPE_SKIPLIST.to_string(),
-            MemtableType::Columnar => MEMTABLE_TYPE_COLUMNAR.to_string(),
+            MemtableType::Column => MEMTABLE_TYPE_COLUMNAR.to_string(),
+        }
+    }
+}
+
+/// Layered memtable options
+/// If `mutable_segment_switch_threshold` is set zero, layered memtable will be
+/// disable.
+#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
+#[serde(default)]
+pub struct LayeredMemtableOptions {
+    pub mutable_segment_switch_threshold: ReadableSize,

Review Comment:
   Shall we declare it as usize? It seems there is no need to use ReadableSize 
here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to