jiacai2050 commented on code in PR #1552:
URL: https://github.com/apache/horaedb/pull/1552#discussion_r1713341235


##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -0,0 +1,741 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    fs,
+    fs::{File, OpenOptions},
+    io,
+    io::Write,
+    path::Path,
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use bytes_ext::{BufMut, BytesMut};
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, 
MIN_SEQUENCE_NUMBER};
+use crc32fast::Hasher;
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+    kv_encoder::{CommonLogEncoding, CommonLogKey},
+    log_batch::{LogEntry, LogWriteBatch},
+    manager::{
+        BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, 
ScanContext,
+        ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+    },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open or create file: {}", source))]
+    FileOpen { source: io::Error },
+
+    #[snafu(display("Failed to map file to memory: {}", source))]
+    Mmap { source: io::Error },
+
+    #[snafu(display("Segment full"))]
+    SegmentFull,
+
+    #[snafu(display("Failed to append data to segment file: {}", source))]
+    SegmentAppend { source: io::Error },
+
+    #[snafu(display("Failed to flush mmap: {}", source))]
+    Flush { source: io::Error },
+
+    #[snafu(display(
+        "Attempted to read beyond segment size. Offset: {}, Size: {}, 
FileSize: {}",
+        offset,
+        size,
+        file_size
+    ))]
+    ReadOutOfBounds {
+        offset: u64,
+        size: u64,
+        file_size: u64,
+    },
+
+    #[snafu(display("Invalid segment header"))]
+    InvalidHeader,
+
+    #[snafu(display("Segment not open, id:{}", id))]
+    SegmentNotOpen { id: u64 },
+
+    #[snafu(display("Segment not found, id:{}", id))]
+    SegmentNotFound { id: u64 },
+
+    #[snafu(display("Unable to convert slice: {}", source))]
+    Conversion {
+        source: std::array::TryFromSliceError,
+    },
+
+    #[snafu(display("{}", source))]
+    Encoding { source: GenericError },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const HEADER: &str = "HoraeDB WAL";
+const CRC_SIZE: usize = 4;
+const RECORD_LENGTH_SIZE: usize = 4;
+const KEY_LENGTH_SIZE: usize = 2;
+const VALUE_LENGTH_SIZE: usize = 4;
+// todo: make MAX_FILE_SIZE configurable
+const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024;
+
+#[derive(Debug)]
+pub struct Segment {

Review Comment:
   Add comments explain the format used in one segment file, like 
https://github.com/apache/horaedb/blob/f4d4a0a94907c5a4709761c47aaa15ac810cae7c/src/wal/src/message_queue_impl/encoding.rs#L211
   
   Also we should include one byte for `version`, so we can change the format 
in future.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -0,0 +1,741 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    fs,
+    fs::{File, OpenOptions},
+    io,
+    io::Write,
+    path::Path,
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use bytes_ext::{BufMut, BytesMut};
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, 
MIN_SEQUENCE_NUMBER};
+use crc32fast::Hasher;
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+    kv_encoder::{CommonLogEncoding, CommonLogKey},
+    log_batch::{LogEntry, LogWriteBatch},
+    manager::{
+        BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, 
ScanContext,
+        ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+    },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open or create file: {}", source))]
+    FileOpen { source: io::Error },
+
+    #[snafu(display("Failed to map file to memory: {}", source))]
+    Mmap { source: io::Error },
+
+    #[snafu(display("Segment full"))]
+    SegmentFull,
+
+    #[snafu(display("Failed to append data to segment file: {}", source))]
+    SegmentAppend { source: io::Error },
+
+    #[snafu(display("Failed to flush mmap: {}", source))]
+    Flush { source: io::Error },
+
+    #[snafu(display(
+        "Attempted to read beyond segment size. Offset: {}, Size: {}, 
FileSize: {}",
+        offset,
+        size,
+        file_size
+    ))]
+    ReadOutOfBounds {
+        offset: u64,
+        size: u64,
+        file_size: u64,
+    },
+
+    #[snafu(display("Invalid segment header"))]
+    InvalidHeader,
+
+    #[snafu(display("Segment not open, id:{}", id))]
+    SegmentNotOpen { id: u64 },
+
+    #[snafu(display("Segment not found, id:{}", id))]
+    SegmentNotFound { id: u64 },
+
+    #[snafu(display("Unable to convert slice: {}", source))]
+    Conversion {
+        source: std::array::TryFromSliceError,
+    },
+
+    #[snafu(display("{}", source))]
+    Encoding { source: GenericError },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const HEADER: &str = "HoraeDB WAL";
+const CRC_SIZE: usize = 4;
+const RECORD_LENGTH_SIZE: usize = 4;
+const KEY_LENGTH_SIZE: usize = 2;
+const VALUE_LENGTH_SIZE: usize = 4;
+// todo: make MAX_FILE_SIZE configurable
+const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024;
+
+#[derive(Debug)]
+pub struct Segment {
+    path: String,
+    id: u64,
+    size: u64,
+    min_seq: SequenceNumber,
+    max_seq: SequenceNumber,
+    is_open: bool,
+    file: Option<File>,
+    mmap: Option<MmapMut>,
+    record_position: Option<Vec<Position>>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Position {
+    start: u64,
+    end: u64,
+}
+
+impl Segment {
+    pub fn new(path: String, segment_id: u64) -> Result<Segment> {
+        if !Path::new(&path).exists() {
+            let mut file = File::create(&path).context(FileOpen)?;
+            file.write_all(HEADER.as_bytes()).context(FileOpen)?;
+        }
+        Ok(Segment {
+            path,
+            id: segment_id,
+            size: HEADER.len() as u64,
+            is_open: false,
+            min_seq: MAX_SEQUENCE_NUMBER,
+            max_seq: MIN_SEQUENCE_NUMBER,
+            file: None,
+            mmap: None,
+            record_position: None,
+        })
+    }
+
+    pub fn open(&mut self) -> Result<()> {
+        let file = OpenOptions::new()
+            .read(true)
+            .append(true)
+            .open(&self.path)
+            .context(FileOpen)?;
+
+        let metadata = file.metadata().context(FileOpen)?;
+        let size = metadata.len();
+
+        let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? };
+
+        // Validate segment header
+        let header_len = HEADER.len();
+        ensure!(size >= header_len as u64, InvalidHeader);
+
+        let header_bytes = &mmap[0..header_len];
+        let header_str = std::str::from_utf8(header_bytes).map_err(|_| 
Error::InvalidHeader)?;
+
+        ensure!(header_str == HEADER, InvalidHeader);
+
+        // Read and validate all records
+        let mut pos = header_len;
+        let mut record_position = Vec::new();
+
+        while pos < size as usize {
+            ensure!(
+                pos + CRC_SIZE + RECORD_LENGTH_SIZE <= size as usize,
+                LengthMismatch {
+                    expected: pos + CRC_SIZE + RECORD_LENGTH_SIZE,
+                    actual: size as usize
+                }
+            );
+
+            // Read the CRC
+            let crc = u32::from_le_bytes(mmap[pos..pos + 
CRC_SIZE].try_into().context(Conversion)?);
+            pos += CRC_SIZE;
+
+            // Read the length
+            let length = u32::from_le_bytes(
+                mmap[pos..pos + RECORD_LENGTH_SIZE]
+                    .try_into()
+                    .context(Conversion)?,
+            );
+            pos += RECORD_LENGTH_SIZE;
+
+            // Ensure the entire record is within the bounds of the mmap
+            ensure!(
+                pos + length as usize <= size as usize,
+                LengthMismatch {
+                    expected: pos + length as usize,
+                    actual: size as usize
+                }
+            );
+
+            // Verify the checksum (CRC32 of the data)
+            let data = &mmap[pos..pos + length as usize];
+            let computed_crc = crc32fast::hash(data);
+            ensure!(
+                computed_crc == crc,
+                ChecksumMismatch {
+                    expected: crc,
+                    actual: computed_crc
+                }
+            );
+
+            record_position.push(Position {
+                start: (pos - CRC_SIZE - RECORD_LENGTH_SIZE) as u64,
+                end: (pos + length as usize) as u64,
+            });
+            // Move to the next record
+            pos += length as usize;
+        }
+
+        self.is_open = true;
+        self.file = Some(file);
+        self.mmap = Some(mmap);
+        self.record_position = Some(record_position);
+        self.size = size;
+        Ok(())
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.is_open = false;
+        self.file.take();
+        self.mmap.take();
+        self.record_position.take();
+        Ok(())
+    }
+
+    pub fn append(&mut self, data: &[u8]) -> Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        ensure!(self.size + data.len() as u64 <= MAX_FILE_SIZE, SegmentFull);

Review Comment:
   When the remaining space is almost full, will we switch to another segment 
file?



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -0,0 +1,741 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    fs,
+    fs::{File, OpenOptions},
+    io,
+    io::Write,
+    path::Path,
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use bytes_ext::{BufMut, BytesMut};
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, 
MIN_SEQUENCE_NUMBER};
+use crc32fast::Hasher;
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+    kv_encoder::{CommonLogEncoding, CommonLogKey},
+    log_batch::{LogEntry, LogWriteBatch},
+    manager::{
+        BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, 
ScanContext,
+        ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+    },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open or create file: {}", source))]
+    FileOpen { source: io::Error },
+
+    #[snafu(display("Failed to map file to memory: {}", source))]
+    Mmap { source: io::Error },
+
+    #[snafu(display("Segment full"))]
+    SegmentFull,
+
+    #[snafu(display("Failed to append data to segment file: {}", source))]
+    SegmentAppend { source: io::Error },
+
+    #[snafu(display("Failed to flush mmap: {}", source))]
+    Flush { source: io::Error },
+
+    #[snafu(display(
+        "Attempted to read beyond segment size. Offset: {}, Size: {}, 
FileSize: {}",
+        offset,
+        size,
+        file_size
+    ))]
+    ReadOutOfBounds {
+        offset: u64,
+        size: u64,
+        file_size: u64,
+    },
+
+    #[snafu(display("Invalid segment header"))]
+    InvalidHeader,
+
+    #[snafu(display("Segment not open, id:{}", id))]
+    SegmentNotOpen { id: u64 },
+
+    #[snafu(display("Segment not found, id:{}", id))]
+    SegmentNotFound { id: u64 },
+
+    #[snafu(display("Unable to convert slice: {}", source))]
+    Conversion {
+        source: std::array::TryFromSliceError,
+    },
+
+    #[snafu(display("{}", source))]
+    Encoding { source: GenericError },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const HEADER: &str = "HoraeDB WAL";
+const CRC_SIZE: usize = 4;
+const RECORD_LENGTH_SIZE: usize = 4;
+const KEY_LENGTH_SIZE: usize = 2;
+const VALUE_LENGTH_SIZE: usize = 4;
+// todo: make MAX_FILE_SIZE configurable
+const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024;
+
+#[derive(Debug)]
+pub struct Segment {
+    path: String,
+    id: u64,
+    size: u64,
+    min_seq: SequenceNumber,
+    max_seq: SequenceNumber,
+    is_open: bool,
+    file: Option<File>,
+    mmap: Option<MmapMut>,
+    record_position: Option<Vec<Position>>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Position {
+    start: u64,
+    end: u64,
+}
+
+impl Segment {
+    pub fn new(path: String, segment_id: u64) -> Result<Segment> {
+        if !Path::new(&path).exists() {
+            let mut file = File::create(&path).context(FileOpen)?;
+            file.write_all(HEADER.as_bytes()).context(FileOpen)?;
+        }
+        Ok(Segment {
+            path,
+            id: segment_id,
+            size: HEADER.len() as u64,
+            is_open: false,
+            min_seq: MAX_SEQUENCE_NUMBER,
+            max_seq: MIN_SEQUENCE_NUMBER,
+            file: None,
+            mmap: None,
+            record_position: None,
+        })
+    }
+
+    pub fn open(&mut self) -> Result<()> {
+        let file = OpenOptions::new()
+            .read(true)
+            .append(true)
+            .open(&self.path)
+            .context(FileOpen)?;
+
+        let metadata = file.metadata().context(FileOpen)?;
+        let size = metadata.len();
+
+        let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? };
+
+        // Validate segment header
+        let header_len = HEADER.len();
+        ensure!(size >= header_len as u64, InvalidHeader);
+
+        let header_bytes = &mmap[0..header_len];
+        let header_str = std::str::from_utf8(header_bytes).map_err(|_| 
Error::InvalidHeader)?;
+
+        ensure!(header_str == HEADER, InvalidHeader);
+
+        // Read and validate all records
+        let mut pos = header_len;
+        let mut record_position = Vec::new();
+
+        while pos < size as usize {
+            ensure!(
+                pos + CRC_SIZE + RECORD_LENGTH_SIZE <= size as usize,
+                LengthMismatch {
+                    expected: pos + CRC_SIZE + RECORD_LENGTH_SIZE,
+                    actual: size as usize
+                }
+            );
+
+            // Read the CRC
+            let crc = u32::from_le_bytes(mmap[pos..pos + 
CRC_SIZE].try_into().context(Conversion)?);
+            pos += CRC_SIZE;
+
+            // Read the length
+            let length = u32::from_le_bytes(
+                mmap[pos..pos + RECORD_LENGTH_SIZE]
+                    .try_into()
+                    .context(Conversion)?,
+            );
+            pos += RECORD_LENGTH_SIZE;
+
+            // Ensure the entire record is within the bounds of the mmap
+            ensure!(
+                pos + length as usize <= size as usize,
+                LengthMismatch {
+                    expected: pos + length as usize,
+                    actual: size as usize
+                }
+            );
+
+            // Verify the checksum (CRC32 of the data)
+            let data = &mmap[pos..pos + length as usize];
+            let computed_crc = crc32fast::hash(data);
+            ensure!(
+                computed_crc == crc,
+                ChecksumMismatch {
+                    expected: crc,
+                    actual: computed_crc
+                }
+            );
+
+            record_position.push(Position {
+                start: (pos - CRC_SIZE - RECORD_LENGTH_SIZE) as u64,
+                end: (pos + length as usize) as u64,
+            });
+            // Move to the next record
+            pos += length as usize;
+        }
+
+        self.is_open = true;
+        self.file = Some(file);
+        self.mmap = Some(mmap);
+        self.record_position = Some(record_position);
+        self.size = size;
+        Ok(())
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.is_open = false;
+        self.file.take();
+        self.mmap.take();
+        self.record_position.take();
+        Ok(())
+    }
+
+    pub fn append(&mut self, data: &[u8]) -> Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });

Review Comment:
   Can use use `self.file.is_some` to indicate if it's opened?
   
   So we don't need `is_open` field.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -0,0 +1,741 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    fs,
+    fs::{File, OpenOptions},
+    io,
+    io::Write,
+    path::Path,
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use bytes_ext::{BufMut, BytesMut};
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, 
MIN_SEQUENCE_NUMBER};
+use crc32fast::Hasher;
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+    kv_encoder::{CommonLogEncoding, CommonLogKey},
+    log_batch::{LogEntry, LogWriteBatch},
+    manager::{
+        BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, 
ScanContext,
+        ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+    },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open or create file: {}", source))]
+    FileOpen { source: io::Error },
+
+    #[snafu(display("Failed to map file to memory: {}", source))]
+    Mmap { source: io::Error },
+
+    #[snafu(display("Segment full"))]
+    SegmentFull,
+
+    #[snafu(display("Failed to append data to segment file: {}", source))]
+    SegmentAppend { source: io::Error },
+
+    #[snafu(display("Failed to flush mmap: {}", source))]
+    Flush { source: io::Error },
+
+    #[snafu(display(
+        "Attempted to read beyond segment size. Offset: {}, Size: {}, 
FileSize: {}",
+        offset,
+        size,
+        file_size
+    ))]
+    ReadOutOfBounds {
+        offset: u64,
+        size: u64,
+        file_size: u64,
+    },
+
+    #[snafu(display("Invalid segment header"))]
+    InvalidHeader,
+
+    #[snafu(display("Segment not open, id:{}", id))]
+    SegmentNotOpen { id: u64 },
+
+    #[snafu(display("Segment not found, id:{}", id))]
+    SegmentNotFound { id: u64 },
+
+    #[snafu(display("Unable to convert slice: {}", source))]
+    Conversion {
+        source: std::array::TryFromSliceError,
+    },
+
+    #[snafu(display("{}", source))]
+    Encoding { source: GenericError },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const HEADER: &str = "HoraeDB WAL";
+const CRC_SIZE: usize = 4;
+const RECORD_LENGTH_SIZE: usize = 4;
+const KEY_LENGTH_SIZE: usize = 2;
+const VALUE_LENGTH_SIZE: usize = 4;
+// todo: make MAX_FILE_SIZE configurable
+const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024;
+
+#[derive(Debug)]
+pub struct Segment {
+    path: String,
+    id: u64,
+    size: u64,
+    min_seq: SequenceNumber,
+    max_seq: SequenceNumber,
+    is_open: bool,
+    file: Option<File>,
+    mmap: Option<MmapMut>,
+    record_position: Option<Vec<Position>>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Position {
+    start: u64,
+    end: u64,
+}
+
+impl Segment {
+    pub fn new(path: String, segment_id: u64) -> Result<Segment> {
+        if !Path::new(&path).exists() {
+            let mut file = File::create(&path).context(FileOpen)?;
+            file.write_all(HEADER.as_bytes()).context(FileOpen)?;
+        }
+        Ok(Segment {
+            path,
+            id: segment_id,
+            size: HEADER.len() as u64,
+            is_open: false,
+            min_seq: MAX_SEQUENCE_NUMBER,
+            max_seq: MIN_SEQUENCE_NUMBER,
+            file: None,
+            mmap: None,
+            record_position: None,
+        })
+    }
+
+    pub fn open(&mut self) -> Result<()> {
+        let file = OpenOptions::new()
+            .read(true)
+            .append(true)
+            .open(&self.path)
+            .context(FileOpen)?;
+
+        let metadata = file.metadata().context(FileOpen)?;
+        let size = metadata.len();
+
+        let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? };
+
+        // Validate segment header
+        let header_len = HEADER.len();
+        ensure!(size >= header_len as u64, InvalidHeader);
+
+        let header_bytes = &mmap[0..header_len];
+        let header_str = std::str::from_utf8(header_bytes).map_err(|_| 
Error::InvalidHeader)?;
+
+        ensure!(header_str == HEADER, InvalidHeader);
+
+        // Read and validate all records
+        let mut pos = header_len;
+        let mut record_position = Vec::new();
+
+        while pos < size as usize {
+            ensure!(
+                pos + CRC_SIZE + RECORD_LENGTH_SIZE <= size as usize,
+                LengthMismatch {
+                    expected: pos + CRC_SIZE + RECORD_LENGTH_SIZE,
+                    actual: size as usize
+                }
+            );
+
+            // Read the CRC
+            let crc = u32::from_le_bytes(mmap[pos..pos + 
CRC_SIZE].try_into().context(Conversion)?);
+            pos += CRC_SIZE;
+
+            // Read the length
+            let length = u32::from_le_bytes(
+                mmap[pos..pos + RECORD_LENGTH_SIZE]
+                    .try_into()
+                    .context(Conversion)?,
+            );
+            pos += RECORD_LENGTH_SIZE;
+
+            // Ensure the entire record is within the bounds of the mmap
+            ensure!(
+                pos + length as usize <= size as usize,
+                LengthMismatch {
+                    expected: pos + length as usize,
+                    actual: size as usize
+                }
+            );
+
+            // Verify the checksum (CRC32 of the data)
+            let data = &mmap[pos..pos + length as usize];
+            let computed_crc = crc32fast::hash(data);
+            ensure!(
+                computed_crc == crc,
+                ChecksumMismatch {
+                    expected: crc,
+                    actual: computed_crc
+                }
+            );
+
+            record_position.push(Position {
+                start: (pos - CRC_SIZE - RECORD_LENGTH_SIZE) as u64,
+                end: (pos + length as usize) as u64,
+            });
+            // Move to the next record
+            pos += length as usize;
+        }
+
+        self.is_open = true;
+        self.file = Some(file);
+        self.mmap = Some(mmap);
+        self.record_position = Some(record_position);
+        self.size = size;
+        Ok(())
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.is_open = false;
+        self.file.take();
+        self.mmap.take();
+        self.record_position.take();
+        Ok(())
+    }
+
+    pub fn append(&mut self, data: &[u8]) -> Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        ensure!(self.size + data.len() as u64 <= MAX_FILE_SIZE, SegmentFull);
+
+        let Some(file) = &mut self.file else {
+            return SegmentNotOpen { id: self.id }.fail();
+        };
+        file.write_all(data).context(SegmentAppend)?;
+        file.flush().context(Flush)?;
+
+        // Remap
+        let mmap = unsafe { MmapOptions::new().map_mut(&*file).context(Mmap)? 
};
+        self.mmap = Some(mmap);
+        self.size += data.len() as u64;
+
+        Ok(())
+    }
+
+    pub fn read(&self, offset: u64, size: u64) -> Result<Vec<u8>> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        ensure!(
+            offset + size <= self.size,
+            LengthMismatch {
+                expected: (offset + size) as usize,
+                actual: self.size as usize
+            }
+        );
+
+        let start = offset as usize;
+        let end = start + size as usize;
+        match &self.mmap {
+            Some(mmap) => Ok(mmap[start..end].to_vec()),
+            None => SegmentNotOpen { id: self.id }.fail(),
+        }
+    }
+
+    pub fn append_record_position(&mut self, pos: &mut Vec<Position>) -> 
Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        match self.record_position.as_mut() {
+            Some(record_position) => {
+                record_position.append(pos);
+                Ok(())
+            }
+            None => SegmentNotOpen { id: self.id }.fail(),
+        }
+    }
+
+    pub fn update_seq(&mut self, min_seq: u64, max_seq: u64) -> Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        if min_seq < self.min_seq {
+            self.min_seq = min_seq;
+        }
+        if max_seq > self.max_seq {
+            self.max_seq = max_seq;
+        }
+        Ok(())
+    }
+}
+
+pub struct SegmentManager {
+    /// All segments protected by a mutex
+    /// todo: maybe use a RWLock?
+    all_segments: Mutex<HashMap<u64, Arc<Mutex<Segment>>>>,
+
+    /// Cache for opened segments
+    cache: Mutex<VecDeque<u64>>,
+
+    /// Maximum size of the cache
+    cache_size: usize,
+
+    /// Directory for segment storage
+    _segment_dir: String,
+
+    /// Index of the latest segment for appending logs
+    latest_segment_idx: AtomicU64,
+
+    /// Encoding method for logs
+    log_encoding: CommonLogEncoding,
+
+    /// Sequence number for the next log
+    next_sequence_num: AtomicU64,
+
+    /// Runtime for handling write requests
+    runtime: Arc<Runtime>,
+}
+
+impl SegmentManager {
+    pub fn new(cache_size: usize, segment_dir: String, runtime: Arc<Runtime>) 
-> Result<Self> {
+        let mut all_segments = HashMap::new();
+
+        // Scan the directory for existing WAL files
+        let mut max_segment_id: i32 = -1;
+
+        // Segment file naming convention: segment_<id>.wal
+        for entry in fs::read_dir(&segment_dir).context(FileOpen)? {
+            let entry = entry.context(FileOpen)?;
+
+            let path = entry.path();
+
+            if !path.is_file() {
+                continue;
+            }
+
+            match path.extension() {
+                Some(ext) if ext == "wal" => ext,
+                _ => continue,
+            };
+
+            let file_name = match path.file_name().and_then(|name| 
name.to_str()) {
+                Some(name) => name,
+                None => continue,
+            };
+
+            let segment_id = match file_name
+                .trim_start_matches("segment_")
+                .trim_end_matches(".wal")
+                .parse::<u64>()
+                .ok()
+            {
+                Some(id) => id,
+                None => continue,
+            };
+
+            let segment = Segment::new(path.to_string_lossy().to_string(), 
segment_id)?;
+            let segment_arc = Arc::new(Mutex::new(segment));
+
+            if segment_id as i32 > max_segment_id {
+                max_segment_id = segment_id as i32;
+            }
+            all_segments.insert(segment_id, segment_arc);
+        }
+
+        // If no existing segments, create a new one
+        if max_segment_id == -1 {
+            max_segment_id = 0;
+            let path = format!("{}/segment_{}.wal", segment_dir, 
max_segment_id);
+            let new_segment = Segment::new(path, max_segment_id as u64)?;
+            let segment_arc = Arc::new(Mutex::new(new_segment));
+            all_segments.insert(0, segment_arc);
+        }
+
+        Ok(Self {
+            all_segments: Mutex::new(all_segments),
+            cache: Mutex::new(VecDeque::new()),
+            cache_size,
+            _segment_dir: segment_dir,
+            latest_segment_idx: AtomicU64::new(max_segment_id as u64),
+            log_encoding: CommonLogEncoding::newest(),
+            // todo: do not use MIN_SEQUENCE_NUMBER, read from the latest 
record
+            next_sequence_num: AtomicU64::new(MIN_SEQUENCE_NUMBER + 1),
+            runtime,
+        })
+    }
+
+    fn get_segment(&self, segment_id: u64) -> Result<Arc<Mutex<Segment>>> {
+        let mut cache = self.cache.lock().unwrap();
+        let all_segments = self.all_segments.lock().unwrap();
+
+        let segment = all_segments.get(&segment_id);
+
+        let segment = match segment {
+            Some(segment_arc) => segment_arc,
+            None => return SegmentNotFound { id: segment_id }.fail(),
+        };
+
+        // Check if segment is already in cache
+        if cache.iter().any(|id| *id == segment_id) {
+            let segment = all_segments.get(&segment_id);
+            return match segment {
+                Some(segment_arc) => Ok(segment_arc.clone()),
+                None => SegmentNotFound { id: segment_id }.fail(),
+            };
+        }
+
+        // If not in cache, load from disk
+        segment.lock().unwrap().open()?;
+
+        // Add to cache
+        if cache.len() == self.cache_size {
+            let evicted_segment_id = cache.pop_front();
+            // TODO: if the evicted segment is being read or written, wait for 
it to finish
+            if let Some(evicted_segment_id) = evicted_segment_id {
+                let evicted_segment = all_segments.get(&evicted_segment_id);
+                if let Some(evicted_segment) = evicted_segment {
+                    evicted_segment.lock().unwrap().close()?;
+                } else {
+                    return SegmentNotFound {
+                        id: evicted_segment_id,
+                    }
+                    .fail();
+                }
+            }
+        }
+        cache.push_back(segment_id);
+
+        Ok(segment.clone())
+    }
+
+    pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> 
Result<SequenceNumber> {
+        let segment = 
self.get_segment(self.latest_segment_idx.load(Ordering::Relaxed))?;

Review Comment:
   I found `latest_segment_idx` is only used here, do you forget to update it?
   
   Also use a atomic integer may not a good choice, since there may exists race 
condition, I suggest use `current: Mutex<Segment>` to keep the latest segment 
for write.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -0,0 +1,741 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    fs,
+    fs::{File, OpenOptions},
+    io,
+    io::Write,
+    path::Path,
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use bytes_ext::{BufMut, BytesMut};
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, 
MIN_SEQUENCE_NUMBER};
+use crc32fast::Hasher;
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+    kv_encoder::{CommonLogEncoding, CommonLogKey},
+    log_batch::{LogEntry, LogWriteBatch},
+    manager::{
+        BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, 
ScanContext,
+        ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+    },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open or create file: {}", source))]
+    FileOpen { source: io::Error },
+
+    #[snafu(display("Failed to map file to memory: {}", source))]
+    Mmap { source: io::Error },
+
+    #[snafu(display("Segment full"))]
+    SegmentFull,
+
+    #[snafu(display("Failed to append data to segment file: {}", source))]
+    SegmentAppend { source: io::Error },
+
+    #[snafu(display("Failed to flush mmap: {}", source))]
+    Flush { source: io::Error },
+
+    #[snafu(display(
+        "Attempted to read beyond segment size. Offset: {}, Size: {}, 
FileSize: {}",
+        offset,
+        size,
+        file_size
+    ))]
+    ReadOutOfBounds {
+        offset: u64,
+        size: u64,
+        file_size: u64,
+    },
+
+    #[snafu(display("Invalid segment header"))]
+    InvalidHeader,
+
+    #[snafu(display("Segment not open, id:{}", id))]
+    SegmentNotOpen { id: u64 },
+
+    #[snafu(display("Segment not found, id:{}", id))]
+    SegmentNotFound { id: u64 },
+
+    #[snafu(display("Unable to convert slice: {}", source))]
+    Conversion {
+        source: std::array::TryFromSliceError,
+    },
+
+    #[snafu(display("{}", source))]
+    Encoding { source: GenericError },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const HEADER: &str = "HoraeDB WAL";
+const CRC_SIZE: usize = 4;
+const RECORD_LENGTH_SIZE: usize = 4;
+const KEY_LENGTH_SIZE: usize = 2;
+const VALUE_LENGTH_SIZE: usize = 4;
+// todo: make MAX_FILE_SIZE configurable
+const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024;
+
+#[derive(Debug)]
+pub struct Segment {
+    path: String,
+    id: u64,
+    size: u64,
+    min_seq: SequenceNumber,
+    max_seq: SequenceNumber,
+    is_open: bool,
+    file: Option<File>,
+    mmap: Option<MmapMut>,
+    record_position: Option<Vec<Position>>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Position {
+    start: u64,
+    end: u64,
+}
+
+impl Segment {
+    pub fn new(path: String, segment_id: u64) -> Result<Segment> {
+        if !Path::new(&path).exists() {
+            let mut file = File::create(&path).context(FileOpen)?;
+            file.write_all(HEADER.as_bytes()).context(FileOpen)?;
+        }
+        Ok(Segment {
+            path,
+            id: segment_id,
+            size: HEADER.len() as u64,
+            is_open: false,
+            min_seq: MAX_SEQUENCE_NUMBER,
+            max_seq: MIN_SEQUENCE_NUMBER,
+            file: None,
+            mmap: None,
+            record_position: None,
+        })
+    }
+
+    pub fn open(&mut self) -> Result<()> {
+        let file = OpenOptions::new()
+            .read(true)
+            .append(true)
+            .open(&self.path)
+            .context(FileOpen)?;
+
+        let metadata = file.metadata().context(FileOpen)?;
+        let size = metadata.len();
+
+        let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? };
+
+        // Validate segment header
+        let header_len = HEADER.len();
+        ensure!(size >= header_len as u64, InvalidHeader);
+
+        let header_bytes = &mmap[0..header_len];
+        let header_str = std::str::from_utf8(header_bytes).map_err(|_| 
Error::InvalidHeader)?;
+
+        ensure!(header_str == HEADER, InvalidHeader);
+
+        // Read and validate all records
+        let mut pos = header_len;
+        let mut record_position = Vec::new();
+
+        while pos < size as usize {
+            ensure!(
+                pos + CRC_SIZE + RECORD_LENGTH_SIZE <= size as usize,
+                LengthMismatch {
+                    expected: pos + CRC_SIZE + RECORD_LENGTH_SIZE,
+                    actual: size as usize
+                }
+            );
+
+            // Read the CRC
+            let crc = u32::from_le_bytes(mmap[pos..pos + 
CRC_SIZE].try_into().context(Conversion)?);
+            pos += CRC_SIZE;
+
+            // Read the length
+            let length = u32::from_le_bytes(
+                mmap[pos..pos + RECORD_LENGTH_SIZE]
+                    .try_into()
+                    .context(Conversion)?,
+            );
+            pos += RECORD_LENGTH_SIZE;
+
+            // Ensure the entire record is within the bounds of the mmap
+            ensure!(
+                pos + length as usize <= size as usize,
+                LengthMismatch {
+                    expected: pos + length as usize,
+                    actual: size as usize
+                }
+            );
+
+            // Verify the checksum (CRC32 of the data)
+            let data = &mmap[pos..pos + length as usize];
+            let computed_crc = crc32fast::hash(data);
+            ensure!(
+                computed_crc == crc,
+                ChecksumMismatch {
+                    expected: crc,
+                    actual: computed_crc
+                }
+            );
+
+            record_position.push(Position {
+                start: (pos - CRC_SIZE - RECORD_LENGTH_SIZE) as u64,
+                end: (pos + length as usize) as u64,
+            });
+            // Move to the next record
+            pos += length as usize;
+        }
+
+        self.is_open = true;
+        self.file = Some(file);
+        self.mmap = Some(mmap);
+        self.record_position = Some(record_position);
+        self.size = size;
+        Ok(())
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.is_open = false;
+        self.file.take();
+        self.mmap.take();
+        self.record_position.take();
+        Ok(())
+    }
+
+    pub fn append(&mut self, data: &[u8]) -> Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        ensure!(self.size + data.len() as u64 <= MAX_FILE_SIZE, SegmentFull);
+
+        let Some(file) = &mut self.file else {
+            return SegmentNotOpen { id: self.id }.fail();
+        };
+        file.write_all(data).context(SegmentAppend)?;
+        file.flush().context(Flush)?;
+
+        // Remap
+        let mmap = unsafe { MmapOptions::new().map_mut(&*file).context(Mmap)? 
};
+        self.mmap = Some(mmap);
+        self.size += data.len() as u64;
+
+        Ok(())
+    }
+
+    pub fn read(&self, offset: u64, size: u64) -> Result<Vec<u8>> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        ensure!(
+            offset + size <= self.size,
+            LengthMismatch {
+                expected: (offset + size) as usize,
+                actual: self.size as usize
+            }
+        );
+
+        let start = offset as usize;
+        let end = start + size as usize;
+        match &self.mmap {
+            Some(mmap) => Ok(mmap[start..end].to_vec()),
+            None => SegmentNotOpen { id: self.id }.fail(),
+        }
+    }
+
+    pub fn append_record_position(&mut self, pos: &mut Vec<Position>) -> 
Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        match self.record_position.as_mut() {
+            Some(record_position) => {
+                record_position.append(pos);
+                Ok(())
+            }
+            None => SegmentNotOpen { id: self.id }.fail(),
+        }
+    }
+
+    pub fn update_seq(&mut self, min_seq: u64, max_seq: u64) -> Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        if min_seq < self.min_seq {
+            self.min_seq = min_seq;
+        }
+        if max_seq > self.max_seq {
+            self.max_seq = max_seq;
+        }
+        Ok(())
+    }
+}
+
+pub struct SegmentManager {
+    /// All segments protected by a mutex
+    /// todo: maybe use a RWLock?
+    all_segments: Mutex<HashMap<u64, Arc<Mutex<Segment>>>>,
+
+    /// Cache for opened segments
+    cache: Mutex<VecDeque<u64>>,
+
+    /// Maximum size of the cache
+    cache_size: usize,
+
+    /// Directory for segment storage
+    _segment_dir: String,
+
+    /// Index of the latest segment for appending logs
+    latest_segment_idx: AtomicU64,
+
+    /// Encoding method for logs
+    log_encoding: CommonLogEncoding,
+
+    /// Sequence number for the next log
+    next_sequence_num: AtomicU64,
+
+    /// Runtime for handling write requests
+    runtime: Arc<Runtime>,
+}
+
+impl SegmentManager {
+    pub fn new(cache_size: usize, segment_dir: String, runtime: Arc<Runtime>) 
-> Result<Self> {
+        let mut all_segments = HashMap::new();
+
+        // Scan the directory for existing WAL files
+        let mut max_segment_id: i32 = -1;
+
+        // Segment file naming convention: segment_<id>.wal
+        for entry in fs::read_dir(&segment_dir).context(FileOpen)? {
+            let entry = entry.context(FileOpen)?;
+
+            let path = entry.path();
+
+            if !path.is_file() {
+                continue;
+            }
+
+            match path.extension() {
+                Some(ext) if ext == "wal" => ext,
+                _ => continue,
+            };
+
+            let file_name = match path.file_name().and_then(|name| 
name.to_str()) {
+                Some(name) => name,
+                None => continue,
+            };
+
+            let segment_id = match file_name
+                .trim_start_matches("segment_")
+                .trim_end_matches(".wal")
+                .parse::<u64>()
+                .ok()
+            {
+                Some(id) => id,
+                None => continue,
+            };
+
+            let segment = Segment::new(path.to_string_lossy().to_string(), 
segment_id)?;
+            let segment_arc = Arc::new(Mutex::new(segment));
+
+            if segment_id as i32 > max_segment_id {
+                max_segment_id = segment_id as i32;
+            }
+            all_segments.insert(segment_id, segment_arc);
+        }
+
+        // If no existing segments, create a new one
+        if max_segment_id == -1 {
+            max_segment_id = 0;
+            let path = format!("{}/segment_{}.wal", segment_dir, 
max_segment_id);
+            let new_segment = Segment::new(path, max_segment_id as u64)?;
+            let segment_arc = Arc::new(Mutex::new(new_segment));
+            all_segments.insert(0, segment_arc);
+        }
+
+        Ok(Self {
+            all_segments: Mutex::new(all_segments),
+            cache: Mutex::new(VecDeque::new()),
+            cache_size,
+            _segment_dir: segment_dir,
+            latest_segment_idx: AtomicU64::new(max_segment_id as u64),
+            log_encoding: CommonLogEncoding::newest(),
+            // todo: do not use MIN_SEQUENCE_NUMBER, read from the latest 
record
+            next_sequence_num: AtomicU64::new(MIN_SEQUENCE_NUMBER + 1),
+            runtime,
+        })
+    }
+
+    fn get_segment(&self, segment_id: u64) -> Result<Arc<Mutex<Segment>>> {
+        let mut cache = self.cache.lock().unwrap();
+        let all_segments = self.all_segments.lock().unwrap();
+
+        let segment = all_segments.get(&segment_id);
+
+        let segment = match segment {
+            Some(segment_arc) => segment_arc,
+            None => return SegmentNotFound { id: segment_id }.fail(),
+        };
+
+        // Check if segment is already in cache
+        if cache.iter().any(|id| *id == segment_id) {
+            let segment = all_segments.get(&segment_id);
+            return match segment {
+                Some(segment_arc) => Ok(segment_arc.clone()),
+                None => SegmentNotFound { id: segment_id }.fail(),
+            };
+        }
+
+        // If not in cache, load from disk
+        segment.lock().unwrap().open()?;
+
+        // Add to cache
+        if cache.len() == self.cache_size {
+            let evicted_segment_id = cache.pop_front();
+            // TODO: if the evicted segment is being read or written, wait for 
it to finish
+            if let Some(evicted_segment_id) = evicted_segment_id {
+                let evicted_segment = all_segments.get(&evicted_segment_id);
+                if let Some(evicted_segment) = evicted_segment {
+                    evicted_segment.lock().unwrap().close()?;
+                } else {
+                    return SegmentNotFound {
+                        id: evicted_segment_id,
+                    }
+                    .fail();
+                }
+            }
+        }
+        cache.push_back(segment_id);
+
+        Ok(segment.clone())
+    }
+
+    pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> 
Result<SequenceNumber> {
+        let segment = 
self.get_segment(self.latest_segment_idx.load(Ordering::Relaxed))?;
+        let mut segment = segment.lock().unwrap();
+
+        let entries_num = batch.len() as u64;
+        let region_id = batch.location.region_id;
+
+        let mut key_buf = BytesMut::new();
+        let prev_sequence_num = self.alloc_sequence_num(entries_num);
+        let mut next_sequence_num = prev_sequence_num;
+        let mut data = Vec::new();
+        let mut record_position = Vec::new();
+
+        for entry in &batch.entries {
+            let mut record_content = Vec::new();
+
+            self.log_encoding
+                .encode_key(
+                    &mut key_buf,
+                    &CommonLogKey::new(region_id, batch.location.table_id, 
next_sequence_num),
+                )
+                .box_err()
+                .context(Encoding)?;
+
+            let key_len = key_buf.len() as u16;
+            record_content.put_u16_le(key_len);
+            record_content.extend_from_slice(&key_buf);
+
+            let value_len = entry.payload.len() as u32;
+            record_content.put_u32_le(value_len);
+            record_content.extend_from_slice(&entry.payload);
+
+            record_position.push(Position {
+                start: data.len() as u64,
+                end: (data.len() + record_content.len() + CRC_SIZE + 
RECORD_LENGTH_SIZE) as u64,
+            });
+
+            // Calculate and encode the CRC
+            let mut hasher = Hasher::new();
+            hasher.update(&record_content);
+            let crc = hasher.finalize();
+            data.put_u32_le(crc);

Review Comment:
   Better to add an `RecordEncode` struct to do this, then we can easily test 
it.



##########
src/wal/src/local_storage_impl/wal_manager.rs:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    fmt,
+    fmt::{Debug, Formatter},
+    path::{Path, PathBuf},
+    sync::Arc,
+};
+
+use async_trait::async_trait;
+use common_types::SequenceNumber;
+use generic_error::BoxError;
+use logger::{debug, info};
+use runtime::Runtime;
+use snafu::ResultExt;
+
+use crate::{
+    config::{Config, StorageConfig},
+    local_storage_impl::{config::LocalStorageConfig, segment::SegmentManager},
+    log_batch::LogWriteBatch,
+    manager::{
+        error::*, BatchLogIteratorAdapter, Open, OpenedWals, ReadContext, 
ReadRequest, RegionId,
+        ScanContext, ScanRequest, WalLocation, WalManager, WalManagerRef, 
WalRuntimes, WalsOpener,
+        WriteContext, MANIFEST_DIR_NAME, WAL_DIR_NAME,
+    },
+};
+
+pub struct LocalStorageImpl {
+    config: LocalStorageConfig,
+    _runtime: Arc<Runtime>,
+    segment_manager: SegmentManager,
+}
+
+impl LocalStorageImpl {
+    pub fn new(
+        wal_path: PathBuf,
+        config: LocalStorageConfig,
+        runtime: Arc<Runtime>,
+    ) -> Result<Self> {
+        let LocalStorageConfig { cache_size, .. } = config.clone();
+        let wal_path_str = wal_path.to_str().unwrap().to_string();
+        let segment_manager =
+            SegmentManager::new(cache_size, wal_path_str.clone(), 
runtime.clone())
+                .box_err()
+                .context(Open {
+                    wal_path: wal_path_str,
+                })?;
+        Ok(Self {
+            config,
+            _runtime: runtime,
+            segment_manager,
+        })
+    }
+}
+
+impl Drop for LocalStorageImpl {
+    fn drop(&mut self) {
+        info!("LocalStorage dropped, config:{:?}", self.config);
+    }
+}
+
+impl Debug for LocalStorageImpl {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        f.debug_struct("LocalStorageImpl")
+            .field("config", &self.config)
+            .finish()
+    }
+}
+
+#[async_trait]
+impl WalManager for LocalStorageImpl {
+    async fn sequence_num(&self, location: WalLocation) -> Result<u64> {
+        self.segment_manager
+            .sequence_num(location)
+            .box_err()
+            .context(Read)
+    }
+
+    async fn mark_delete_entries_up_to(
+        &self,
+        location: WalLocation,
+        sequence_num: SequenceNumber,
+    ) -> Result<()> {
+        self.segment_manager
+            .mark_delete_entries_up_to(location, sequence_num)
+            .box_err()
+            .context(Delete)
+    }
+
+    async fn close_region(&self, region_id: RegionId) -> Result<()> {
+        debug!(
+            "Close region for LocalStorage based WAL is noop operation, 
region_id:{}",
+            region_id
+        );
+
+        Ok(())
+    }
+
+    async fn close_gracefully(&self) -> Result<()> {
+        info!("Close local storage wal gracefully");

Review Comment:
   Add TODO comments, we should close all opened files in this step.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -0,0 +1,741 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    fs,
+    fs::{File, OpenOptions},
+    io,
+    io::Write,
+    path::Path,
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use bytes_ext::{BufMut, BytesMut};
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, 
MIN_SEQUENCE_NUMBER};
+use crc32fast::Hasher;
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+    kv_encoder::{CommonLogEncoding, CommonLogKey},
+    log_batch::{LogEntry, LogWriteBatch},
+    manager::{
+        BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, 
ScanContext,
+        ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+    },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open or create file: {}", source))]
+    FileOpen { source: io::Error },
+
+    #[snafu(display("Failed to map file to memory: {}", source))]
+    Mmap { source: io::Error },
+
+    #[snafu(display("Segment full"))]
+    SegmentFull,
+
+    #[snafu(display("Failed to append data to segment file: {}", source))]
+    SegmentAppend { source: io::Error },
+
+    #[snafu(display("Failed to flush mmap: {}", source))]
+    Flush { source: io::Error },
+
+    #[snafu(display(
+        "Attempted to read beyond segment size. Offset: {}, Size: {}, 
FileSize: {}",
+        offset,
+        size,
+        file_size
+    ))]
+    ReadOutOfBounds {
+        offset: u64,
+        size: u64,
+        file_size: u64,
+    },
+
+    #[snafu(display("Invalid segment header"))]
+    InvalidHeader,
+
+    #[snafu(display("Segment not open, id:{}", id))]
+    SegmentNotOpen { id: u64 },
+
+    #[snafu(display("Segment not found, id:{}", id))]
+    SegmentNotFound { id: u64 },
+
+    #[snafu(display("Unable to convert slice: {}", source))]
+    Conversion {
+        source: std::array::TryFromSliceError,
+    },
+
+    #[snafu(display("{}", source))]
+    Encoding { source: GenericError },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const HEADER: &str = "HoraeDB WAL";
+const CRC_SIZE: usize = 4;
+const RECORD_LENGTH_SIZE: usize = 4;
+const KEY_LENGTH_SIZE: usize = 2;
+const VALUE_LENGTH_SIZE: usize = 4;
+// todo: make MAX_FILE_SIZE configurable
+const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024;
+
+#[derive(Debug)]
+pub struct Segment {
+    path: String,
+    id: u64,
+    size: u64,
+    min_seq: SequenceNumber,
+    max_seq: SequenceNumber,
+    is_open: bool,
+    file: Option<File>,
+    mmap: Option<MmapMut>,
+    record_position: Option<Vec<Position>>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Position {
+    start: u64,
+    end: u64,
+}
+
+impl Segment {
+    pub fn new(path: String, segment_id: u64) -> Result<Segment> {
+        if !Path::new(&path).exists() {
+            let mut file = File::create(&path).context(FileOpen)?;
+            file.write_all(HEADER.as_bytes()).context(FileOpen)?;
+        }
+        Ok(Segment {
+            path,
+            id: segment_id,
+            size: HEADER.len() as u64,
+            is_open: false,
+            min_seq: MAX_SEQUENCE_NUMBER,
+            max_seq: MIN_SEQUENCE_NUMBER,
+            file: None,
+            mmap: None,
+            record_position: None,
+        })
+    }
+
+    pub fn open(&mut self) -> Result<()> {
+        let file = OpenOptions::new()
+            .read(true)
+            .append(true)
+            .open(&self.path)
+            .context(FileOpen)?;
+
+        let metadata = file.metadata().context(FileOpen)?;
+        let size = metadata.len();
+
+        let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? };
+
+        // Validate segment header
+        let header_len = HEADER.len();
+        ensure!(size >= header_len as u64, InvalidHeader);
+
+        let header_bytes = &mmap[0..header_len];
+        let header_str = std::str::from_utf8(header_bytes).map_err(|_| 
Error::InvalidHeader)?;
+
+        ensure!(header_str == HEADER, InvalidHeader);
+
+        // Read and validate all records
+        let mut pos = header_len;
+        let mut record_position = Vec::new();
+
+        while pos < size as usize {
+            ensure!(
+                pos + CRC_SIZE + RECORD_LENGTH_SIZE <= size as usize,
+                LengthMismatch {
+                    expected: pos + CRC_SIZE + RECORD_LENGTH_SIZE,
+                    actual: size as usize
+                }
+            );
+
+            // Read the CRC
+            let crc = u32::from_le_bytes(mmap[pos..pos + 
CRC_SIZE].try_into().context(Conversion)?);
+            pos += CRC_SIZE;
+
+            // Read the length
+            let length = u32::from_le_bytes(
+                mmap[pos..pos + RECORD_LENGTH_SIZE]
+                    .try_into()
+                    .context(Conversion)?,
+            );
+            pos += RECORD_LENGTH_SIZE;
+
+            // Ensure the entire record is within the bounds of the mmap
+            ensure!(
+                pos + length as usize <= size as usize,
+                LengthMismatch {
+                    expected: pos + length as usize,
+                    actual: size as usize
+                }
+            );
+
+            // Verify the checksum (CRC32 of the data)
+            let data = &mmap[pos..pos + length as usize];
+            let computed_crc = crc32fast::hash(data);
+            ensure!(
+                computed_crc == crc,
+                ChecksumMismatch {
+                    expected: crc,
+                    actual: computed_crc
+                }
+            );
+
+            record_position.push(Position {
+                start: (pos - CRC_SIZE - RECORD_LENGTH_SIZE) as u64,
+                end: (pos + length as usize) as u64,
+            });
+            // Move to the next record
+            pos += length as usize;
+        }
+
+        self.is_open = true;
+        self.file = Some(file);
+        self.mmap = Some(mmap);
+        self.record_position = Some(record_position);
+        self.size = size;
+        Ok(())
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.is_open = false;
+        self.file.take();
+        self.mmap.take();
+        self.record_position.take();
+        Ok(())
+    }
+
+    pub fn append(&mut self, data: &[u8]) -> Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        ensure!(self.size + data.len() as u64 <= MAX_FILE_SIZE, SegmentFull);
+
+        let Some(file) = &mut self.file else {
+            return SegmentNotOpen { id: self.id }.fail();
+        };
+        file.write_all(data).context(SegmentAppend)?;
+        file.flush().context(Flush)?;
+
+        // Remap
+        let mmap = unsafe { MmapOptions::new().map_mut(&*file).context(Mmap)? 
};
+        self.mmap = Some(mmap);
+        self.size += data.len() as u64;
+
+        Ok(())
+    }
+
+    pub fn read(&self, offset: u64, size: u64) -> Result<Vec<u8>> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        ensure!(
+            offset + size <= self.size,
+            LengthMismatch {
+                expected: (offset + size) as usize,
+                actual: self.size as usize
+            }
+        );
+
+        let start = offset as usize;
+        let end = start + size as usize;
+        match &self.mmap {
+            Some(mmap) => Ok(mmap[start..end].to_vec()),
+            None => SegmentNotOpen { id: self.id }.fail(),
+        }
+    }
+
+    pub fn append_record_position(&mut self, pos: &mut Vec<Position>) -> 
Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        match self.record_position.as_mut() {
+            Some(record_position) => {
+                record_position.append(pos);
+                Ok(())
+            }
+            None => SegmentNotOpen { id: self.id }.fail(),
+        }
+    }
+
+    pub fn update_seq(&mut self, min_seq: u64, max_seq: u64) -> Result<()> {
+        ensure!(self.is_open, SegmentNotOpen { id: self.id });
+        if min_seq < self.min_seq {
+            self.min_seq = min_seq;
+        }
+        if max_seq > self.max_seq {
+            self.max_seq = max_seq;
+        }
+        Ok(())
+    }
+}
+
+pub struct SegmentManager {
+    /// All segments protected by a mutex
+    /// todo: maybe use a RWLock?
+    all_segments: Mutex<HashMap<u64, Arc<Mutex<Segment>>>>,
+
+    /// Cache for opened segments
+    cache: Mutex<VecDeque<u64>>,
+
+    /// Maximum size of the cache
+    cache_size: usize,
+
+    /// Directory for segment storage
+    _segment_dir: String,
+
+    /// Index of the latest segment for appending logs
+    latest_segment_idx: AtomicU64,
+
+    /// Encoding method for logs
+    log_encoding: CommonLogEncoding,
+
+    /// Sequence number for the next log
+    next_sequence_num: AtomicU64,
+
+    /// Runtime for handling write requests
+    runtime: Arc<Runtime>,
+}
+
+impl SegmentManager {
+    pub fn new(cache_size: usize, segment_dir: String, runtime: Arc<Runtime>) 
-> Result<Self> {
+        let mut all_segments = HashMap::new();
+
+        // Scan the directory for existing WAL files
+        let mut max_segment_id: i32 = -1;
+
+        // Segment file naming convention: segment_<id>.wal
+        for entry in fs::read_dir(&segment_dir).context(FileOpen)? {
+            let entry = entry.context(FileOpen)?;
+
+            let path = entry.path();
+
+            if !path.is_file() {
+                continue;
+            }
+
+            match path.extension() {
+                Some(ext) if ext == "wal" => ext,
+                _ => continue,
+            };
+
+            let file_name = match path.file_name().and_then(|name| 
name.to_str()) {
+                Some(name) => name,
+                None => continue,
+            };
+
+            let segment_id = match file_name
+                .trim_start_matches("segment_")
+                .trim_end_matches(".wal")
+                .parse::<u64>()
+                .ok()
+            {
+                Some(id) => id,
+                None => continue,
+            };
+
+            let segment = Segment::new(path.to_string_lossy().to_string(), 
segment_id)?;
+            let segment_arc = Arc::new(Mutex::new(segment));

Review Comment:
   Just name it `segment`.



##########
src/wal/src/local_storage_impl/segment.rs:
##########
@@ -0,0 +1,741 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::{HashMap, VecDeque},
+    fmt::Debug,
+    fs,
+    fs::{File, OpenOptions},
+    io,
+    io::Write,
+    path::Path,
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use bytes_ext::{BufMut, BytesMut};
+use common_types::{table::TableId, SequenceNumber, MAX_SEQUENCE_NUMBER, 
MIN_SEQUENCE_NUMBER};
+use crc32fast::Hasher;
+use generic_error::{BoxError, GenericError};
+use macros::define_result;
+use memmap2::{MmapMut, MmapOptions};
+use runtime::Runtime;
+use snafu::{ensure, Backtrace, ResultExt, Snafu};
+
+use crate::{
+    kv_encoder::{CommonLogEncoding, CommonLogKey},
+    log_batch::{LogEntry, LogWriteBatch},
+    manager::{
+        BatchLogIteratorAdapter, Read, ReadContext, ReadRequest, RegionId, 
ScanContext,
+        ScanRequest, SyncLogIterator, WalLocation, WriteContext,
+    },
+};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open or create file: {}", source))]
+    FileOpen { source: io::Error },
+
+    #[snafu(display("Failed to map file to memory: {}", source))]
+    Mmap { source: io::Error },
+
+    #[snafu(display("Segment full"))]
+    SegmentFull,
+
+    #[snafu(display("Failed to append data to segment file: {}", source))]
+    SegmentAppend { source: io::Error },
+
+    #[snafu(display("Failed to flush mmap: {}", source))]
+    Flush { source: io::Error },
+
+    #[snafu(display(
+        "Attempted to read beyond segment size. Offset: {}, Size: {}, 
FileSize: {}",
+        offset,
+        size,
+        file_size
+    ))]
+    ReadOutOfBounds {
+        offset: u64,
+        size: u64,
+        file_size: u64,
+    },
+
+    #[snafu(display("Invalid segment header"))]
+    InvalidHeader,
+
+    #[snafu(display("Segment not open, id:{}", id))]
+    SegmentNotOpen { id: u64 },
+
+    #[snafu(display("Segment not found, id:{}", id))]
+    SegmentNotFound { id: u64 },
+
+    #[snafu(display("Unable to convert slice: {}", source))]
+    Conversion {
+        source: std::array::TryFromSliceError,
+    },
+
+    #[snafu(display("{}", source))]
+    Encoding { source: GenericError },
+
+    #[snafu(display("Invalid record: {}, backtrace:\n{}", source, backtrace))]
+    InvalidRecord {
+        source: GenericError,
+        backtrace: Backtrace,
+    },
+
+    #[snafu(display("Length mismatch: expected {} but found {}", expected, 
actual))]
+    LengthMismatch { expected: usize, actual: usize },
+
+    #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, 
actual))]
+    ChecksumMismatch { expected: u32, actual: u32 },
+}
+
+define_result!(Error);
+
+const HEADER: &str = "HoraeDB WAL";
+const CRC_SIZE: usize = 4;
+const RECORD_LENGTH_SIZE: usize = 4;
+const KEY_LENGTH_SIZE: usize = 2;
+const VALUE_LENGTH_SIZE: usize = 4;
+// todo: make MAX_FILE_SIZE configurable
+const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024;
+
+#[derive(Debug)]
+pub struct Segment {
+    path: String,
+    id: u64,
+    size: u64,
+    min_seq: SequenceNumber,
+    max_seq: SequenceNumber,
+    is_open: bool,
+    file: Option<File>,
+    mmap: Option<MmapMut>,
+    record_position: Option<Vec<Position>>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Position {
+    start: u64,
+    end: u64,
+}
+
+impl Segment {
+    pub fn new(path: String, segment_id: u64) -> Result<Segment> {
+        if !Path::new(&path).exists() {
+            let mut file = File::create(&path).context(FileOpen)?;
+            file.write_all(HEADER.as_bytes()).context(FileOpen)?;
+        }
+        Ok(Segment {
+            path,
+            id: segment_id,
+            size: HEADER.len() as u64,
+            is_open: false,
+            min_seq: MAX_SEQUENCE_NUMBER,
+            max_seq: MIN_SEQUENCE_NUMBER,
+            file: None,
+            mmap: None,
+            record_position: None,
+        })
+    }
+
+    pub fn open(&mut self) -> Result<()> {
+        let file = OpenOptions::new()
+            .read(true)
+            .append(true)
+            .open(&self.path)
+            .context(FileOpen)?;
+
+        let metadata = file.metadata().context(FileOpen)?;
+        let size = metadata.len();
+
+        let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? };
+
+        // Validate segment header
+        let header_len = HEADER.len();
+        ensure!(size >= header_len as u64, InvalidHeader);
+
+        let header_bytes = &mmap[0..header_len];
+        let header_str = std::str::from_utf8(header_bytes).map_err(|_| 
Error::InvalidHeader)?;

Review Comment:
   I think we can just compare the bytes direct here, no need to convert to 
utf8 string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to