Rachelint commented on code in PR #1271:
URL: 
https://github.com/apache/incubator-horaedb/pull/1271#discussion_r1438122474


##########
analytic_engine/src/memtable/layered/mod.rs:
##########
@@ -0,0 +1,459 @@
+// Copyright 2023 The CeresDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! MemTable based on skiplist
+
+pub mod factory;
+pub mod iter;
+
+use std::{
+    fmt::format,
+    mem,
+    ops::{Bound, Deref},
+    sync::{
+        atomic::{self, AtomicI64, AtomicU64, AtomicUsize},
+        Arc, RwLock,
+    },
+};
+
+use arena::{Arena, BasicStats, CollectorRef};
+use bytes_ext::Bytes;
+use codec::Encoder;
+use common_types::{
+    projected_schema::ProjectedSchema,
+    record_batch::RecordBatchWithKey,
+    row::{contiguous::ContiguousRowWriter, Row},
+    schema::Schema,
+    time::TimeRange,
+    SequenceNumber,
+};
+use generic_error::BoxError;
+use logger::{debug, trace};
+use skiplist::{BytewiseComparator, KeyComparator, Skiplist};
+use snafu::{ensure, OptionExt, ResultExt};
+
+use crate::memtable::{
+    factory::{Factory, FactoryRef, Options},
+    key::{ComparableInternalKey, KeySequence},
+    layered::iter::ColumnarIterImpl,
+    ColumnarIterPtr, EncodeInternalKey, Internal, InternalNoCause, 
InvalidPutSequence, InvalidRow,
+    MemTable, MemTableRef, Metrics as MemtableMetrics, PutContext, Result, 
ScanContext,
+    ScanRequest, TimestampNotFound,
+};
+
+#[derive(Default, Debug)]
+struct Metrics {
+    row_raw_size: AtomicUsize,
+    row_encoded_size: AtomicUsize,
+    row_count: AtomicUsize,
+}
+
+/// MemTable implementation based on skiplist
+pub(crate) struct LayeredMemTable {
+    /// Schema of this memtable, is immutable.
+    schema: Schema,
+
+    /// The last sequence of the rows in this memtable. Update to this field
+    /// require external synchronization.
+    last_sequence: AtomicU64,
+
+    inner: RwLock<Inner>,
+
+    mutable_switch_threshold: usize,
+}
+
+impl LayeredMemTable {
+    pub fn new(
+        opts: &Options,
+        inner_memtable_factory: FactoryRef,
+        mutable_switch_threshold: usize,
+    ) -> Result<Self> {
+        let inner = Inner::new(inner_memtable_factory, opts)?;
+
+        Ok(Self {
+            schema: opts.schema.clone(),
+            last_sequence: AtomicU64::new(opts.creation_sequence),
+            inner: RwLock::new(inner),
+            mutable_switch_threshold,
+        })
+    }
+}
+
+impl MemTable for LayeredMemTable {
+    fn schema(&self) -> &Schema {
+        &self.schema
+    }
+
+    fn min_key(&self) -> Option<Bytes> {
+        self.inner.read().unwrap().min_key()
+    }
+
+    fn max_key(&self) -> Option<Bytes> {
+        self.inner.read().unwrap().max_key()
+    }
+
+    fn put(
+        &self,
+        ctx: &mut PutContext,
+        sequence: KeySequence,
+        row: &Row,
+        schema: &Schema,
+    ) -> Result<()> {
+        let memory_usage = {
+            let inner = self.inner.read().unwrap();
+            inner.put(ctx, sequence, row, schema)?;
+            inner.approximate_memory_usage()
+        };
+
+        if memory_usage > self.mutable_switch_threshold {
+            let inner = &mut *self.inner.write().unwrap();
+            inner.switch_mutable_segment(self.schema.clone())?;
+        }
+
+        Ok(())
+    }
+
+    fn scan(&self, ctx: ScanContext, request: ScanRequest) -> 
Result<ColumnarIterPtr> {
+        let inner = self.inner.read().unwrap();
+        inner.scan(ctx, request)
+    }
+
+    fn approximate_memory_usage(&self) -> usize {
+        self.inner.read().unwrap().approximate_memory_usage()
+    }
+
+    fn set_last_sequence(&self, sequence: SequenceNumber) -> Result<()> {
+        Ok(self
+            .last_sequence
+            .store(sequence, atomic::Ordering::Relaxed))
+    }
+
+    fn last_sequence(&self) -> SequenceNumber {
+        self.last_sequence.load(atomic::Ordering::Relaxed)
+    }
+
+    fn time_range(&self) -> Option<TimeRange> {
+        let inner = self.inner.read().unwrap();
+        inner.time_range()
+    }
+
+    fn metrics(&self) -> MemtableMetrics {
+        // FIXME: stats and return metrics
+        MemtableMetrics::default()
+    }
+}
+
+/// Layered memtable inner
+struct Inner {
+    mutable_segment_builder: MutableSegmentBuilder,
+    mutable_segment: MutableSegment,
+    immutable_segments: Vec<ImmutableSegment>,
+}
+
+impl Inner {
+    fn new(memtable_factory: FactoryRef, opts: &Options) -> Result<Self> {
+        let builder_opts = MutableBuilderOptions {
+            schema: opts.schema.clone(),
+            arena_block_size: opts.arena_block_size,
+            collector: opts.collector.clone(),
+        };
+
+        let mutable_segment_builder = MutableSegmentBuilder {
+            memtable_factory,
+            opts: builder_opts,
+        };
+
+        // Build the first mutable batch.
+        let init_mutable_segment = mutable_segment_builder.build()?;
+
+        Ok(Self {
+            mutable_segment_builder,
+            mutable_segment: init_mutable_segment,
+            immutable_segments: vec![],
+        })
+    }
+
+    /// Scan batches including `mutable` and `immutable`s.
+    #[inline]
+    fn scan(&self, ctx: ScanContext, request: ScanRequest) -> 
Result<ColumnarIterPtr> {
+        let iter = ColumnarIterImpl::new(
+            ctx,
+            request,
+            &self.mutable_segment,
+            &self.immutable_segments,
+        )?;
+        Ok(Box::new(iter))
+    }
+
+    #[inline]
+    fn put(
+        &self,
+        ctx: &mut PutContext,
+        sequence: KeySequence,
+        row: &Row,
+        schema: &Schema,
+    ) -> Result<()> {
+        self.mutable_segment.put(ctx, sequence, row, schema)
+    }
+
+    fn switch_mutable_segment(&mut self, schema: Schema) -> Result<()> {
+        // Build a new mutable segment, and replace current's.
+        let new_mutable = self.mutable_segment_builder.build()?;
+        let current_mutable = mem::replace(&mut self.mutable_segment, 
new_mutable);
+
+        // Convert current's to immutable.
+        let scan_ctx = ScanContext::default();
+        let scan_req = ScanRequest {
+            start_user_key: Bound::Unbounded,
+            end_user_key: Bound::Unbounded,
+            sequence: common_types::MAX_SEQUENCE_NUMBER,
+            projected_schema: ProjectedSchema::no_projection(schema),
+            need_dedup: false,
+            reverse: false,
+            metrics_collector: None,
+            time_range: TimeRange::min_to_max(),
+        };
+
+        let immutable_batches = current_mutable

Review Comment:
   Yes, I plan to make it in later prs for keep the simplicity of the initial 
impl.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to