This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch clean-up
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit ba8bf559de68892b87fde961bd9776a51ea5de51
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Mon Apr 8 00:14:01 2024 +0000

    Remove wal and fix e2e
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                                         |    3 +
 dist/LICENSE                                       |    1 -
 dist/licenses/license-github.com-golang-snappy.txt |   27 -
 docs/concept/persistence-storage.md                |    6 +-
 docs/concept/wal.md                                |   47 -
 go.mod                                             |    1 -
 go.sum                                             |    2 -
 pkg/wal/README.md                                  |  147 ---
 pkg/wal/wal.go                                     | 1081 --------------------
 pkg/wal/wal_benchmark_test.go                      |  450 --------
 pkg/wal/wal_test.go                                |  242 -----
 test/docker/base-compose.yml                       |    3 +-
 test/e2e-v2/script/env                             |    2 +-
 test/stress/istio/testdata/groups/group.yaml       |   21 -
 test/stress/trace/docker-compose.yaml              |    3 +-
 test/stress/trace/env                              |   15 +-
 ui/src/components/Aside/index.vue                  |   10 -
 17 files changed, 12 insertions(+), 2049 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 7dcb06f3..6f2b85d8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,9 @@ Release Notes.
   - Merge memory data and disk data.
 - Add HTTP services to TopNAggregation operations.
 - Add preload for the TopN query of index.
+- Remove "TREE" index type. The "TREE" index type is merged into "INVERTED" 
index type.
+- Remove "Location" field on IndexRule. Currently, the location of index is in 
a segment.
+- Remove "BlockInterval" from Group. The block size is determined by the part.
 
 ### Bugs
 
diff --git a/dist/LICENSE b/dist/LICENSE
index 3d7db32b..66b9ff1d 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -266,7 +266,6 @@ BSD-3-Clause licenses
     github.com/fsnotify/fsnotify v1.7.0 BSD-3-Clause
     github.com/gogo/protobuf v1.3.2 BSD-3-Clause
     github.com/golang/protobuf v1.5.3 BSD-3-Clause
-    github.com/golang/snappy v0.0.4 BSD-3-Clause
     github.com/google/go-cmp v0.6.0 BSD-3-Clause
     github.com/google/uuid v1.4.0 BSD-3-Clause
     github.com/gorilla/websocket v1.5.1 BSD-3-Clause
diff --git a/dist/licenses/license-github.com-golang-snappy.txt 
b/dist/licenses/license-github.com-golang-snappy.txt
deleted file mode 100644
index 6050c10f..00000000
--- a/dist/licenses/license-github.com-golang-snappy.txt
+++ /dev/null
@@ -1,27 +0,0 @@
-Copyright (c) 2011 The Snappy-Go Authors. All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are
-met:
-
-   * Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
-   * Redistributions in binary form must reproduce the above
-copyright notice, this list of conditions and the following disclaimer
-in the documentation and/or other materials provided with the
-distribution.
-   * Neither the name of Google Inc. nor the names of its
-contributors may be used to endorse or promote products derived from
-this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/docs/concept/persistence-storage.md 
b/docs/concept/persistence-storage.md
index 48199985..dd057ed2 100644
--- a/docs/concept/persistence-storage.md
+++ b/docs/concept/persistence-storage.md
@@ -1,11 +1,11 @@
 # Persistence Storage
-Persistence storage is used for unifying data of BanyanDB persistence, 
including write-ahead logging(WAL), index, and data collected from skywalking 
and other observability platforms or APM systems. It provides various 
implementations and IO modes to satisfy the need of different components.
+Persistence storage is used for unifying data of BanyanDB persistence, 
including index, and data collected from skywalking and other observability 
platforms or APM systems. It provides various implementations and IO modes to 
satisfy the need of different components.
 BanyanDB provides a concise interface that shields the complexity of the 
implementation from the upper layer. By exposing necessary interfaces, upper 
components do not need to care how persistence is implemented and avoid dealing 
with differences between different operating systems.
 
 # Architecture
 BanyanDB uses third-party storage for actual storage, and the file system 
shields the differences between different platforms and storage systems, 
allowing developers to operate files as easily as the local file system without 
worrying about specific details.
 
-For different data models, stored in different locations, such as for meta and 
wal data, BanyanDB uses a local file system for storage.
+For different data models, stored in different locations, such as for meta 
data, BanyanDB uses a local file system for storage.
 
![](https://skywalking.apache.org/doc-graph/banyandb/v0.5.0/local_file_system.png)
 
 For index and data, the architecture of the file system is divided into three 
layers.
@@ -45,7 +45,7 @@ return: The file instance, can be used for various file 
operations.
 
 ### Write
 BanyanDB provides two methods for writing files.
-Append mode, which adds new data to the end of a file. This mode is typically 
used for WAL. And BanyanDB supports vector Append mode, which supports 
appending consecutive buffers to the end of the file.
+Append mode, which adds new data to the end of a file. BanyanDB also supports 
vector Append mode, which supports appending consecutive buffers to the end of 
the file.
 Flush mode, which flushes all data to one file. It will return an error when 
writing a directory, the file does not exist or there is not enough space, and 
the incomplete file will be discarded. The flush operation is atomic, which 
means the file won't be created if an error happens during the flush process.
 The following is the pseudocode that calls the API in the go style.
 
diff --git a/docs/concept/wal.md b/docs/concept/wal.md
deleted file mode 100644
index 9b95c584..00000000
--- a/docs/concept/wal.md
+++ /dev/null
@@ -1,47 +0,0 @@
-# Background
-
-Write Ahead Logging (WAL) is a technique used in databases to ensure that data 
is not lost due to system crashes or other failures. The basic idea of WAL is 
to log changes to a database in a separate file before applying them to the 
database itself. This way, if there is a system failure, the database can be 
recovered by replaying the log of changes from the WAL file.
-BanyanDB leverages the WAL to enhance the data buffer for schema resource 
writing. In such a system, write operations are first written to the WAL file 
before being applied to the interval buffer. This ensures that the log is 
written to disk before the actual data is written. Hence the term "write ahead".
-
-# Format
-
-![](https://skywalking.apache.org/doc-graph/banyandb/v0.4.0/wal-format.png)
-
-A segment refers to a block of data in the WAL file that contains a sequence 
of database changes. Once `rotate` is invoked, a new segment is created to 
continue logging subsequent changes.
-A "WALEntry" is a data unit representing a series of changes to a Series. Each 
WALEntry is written to a segment.
-
-WAlEntry contains as follows:
-- Length:8 bytes, which means the length of a WalEntry.
-- Series ID:8 bytes, the same as request Series ID.
-- Count:4 bytes, how many binary/timestamps in one WalEntry.
-- Timestamp:8 bytes.
-- Binary Length:2 bytes.
-- Binary: value in the write request.
-
-# Write process
-
-![](https://skywalking.apache.org/doc-graph/banyandb/v0.4.0/wal.png)
-
-The writing process in WAL is as follows:
-
-1. Firstly, the changes are first written to the write buffer. Those with the 
same series ID will go to the identical WALEntry.
-2. When the buffer is full, the WALEntry is created, then flushed to the disk. 
WAL can optionally use the compression algorithm snappy to compress the data on 
disk. Each WALEntry is appended to the tail of the WAL file on the disk.
-
-When entries in the buffer are flushed to the disk, the callback function 
returned by the write operation is invoked. You can ignore this function to 
improve the writing performance, but it risks losing data.
-# Read WAL
-A client could read a single segment by a segment id. When opening the segment 
file, the reader will decompress the WAL file if the writing compresses the 
data.
-
-# Rotation
-WAL supports rotation operation to switch to a new segment. The operation 
closes the currently open segment and opens a new one, returning the closed 
segment details.
-
-# Delete
-A client could delete a segment closed by the `rotate` operation.
-
-# configuration
-BanyanDB WAL has the following configuration options:
-
-| Name | Default Value | Introduction |
-| --- | --- | --- |
-| wal_compression | true | Compress the WAL entry or not |
-| wal_file_size | 64MB | The size of the WAL file|
-| wal_buffer_size | 16kB | The size of WAL buffer. |
diff --git a/go.mod b/go.mod
index a9d0243b..3df6b15c 100644
--- a/go.mod
+++ b/go.mod
@@ -83,7 +83,6 @@ require (
        github.com/gogo/protobuf v1.3.2 // indirect
        github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
        github.com/golang/protobuf v1.5.3 // indirect
-       github.com/golang/snappy v0.0.4
        github.com/google/btree v1.1.2 // indirect
        github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect
        github.com/gorilla/websocket v1.5.1 // indirect
diff --git a/go.sum b/go.sum
index 9546f716..08274c9f 100644
--- a/go.sum
+++ b/go.sum
@@ -138,8 +138,6 @@ github.com/golang/protobuf v1.5.0/go.mod 
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
 github.com/golang/protobuf v1.5.2/go.mod 
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/golang/protobuf v1.5.3 
h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
 github.com/golang/protobuf v1.5.3/go.mod 
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
-github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
-github.com/golang/snappy v0.0.4/go.mod 
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
 github.com/google/btree v1.1.2/go.mod 
h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
 github.com/google/go-cmp v0.2.0/go.mod 
h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
diff --git a/pkg/wal/README.md b/pkg/wal/README.md
deleted file mode 100644
index 82068238..00000000
--- a/pkg/wal/README.md
+++ /dev/null
@@ -1,147 +0,0 @@
-# WAL
-
-Deprecation Notice: This package is deprecated and will be removed in the 
future.
-
-## Benchmark
-
-Testing environment:
-
-```text
-MacBook Pro (13-inch, M1, 2020)
-Processor: Apple M1
-Memory: 16 GB
-CPU: 8 cores
-```
-
-Command used
-
-```shell
-go test -bench=. -benchmem -run=^$ -benchtime=10s -count 1 
-memprofile=mem_profile.out -cpuprofile=cpu_profile.out
-```
-
-Test report
-
-```text
-goos: darwin
-goarch: arm64
-pkg: github.com/apache/skywalking-banyandb/pkg/wal
-
-Benchmark_SeriesID_1-8                                                    
299770             41357 ns/op            2654 B/op          3 allocs/op
-Benchmark_SeriesID_20-8                                                   
245113             42125 ns/op            1916 B/op          5 allocs/op
-Benchmark_SeriesID_100-8                                                  
296856             42177 ns/op            1291 B/op          7 allocs/op
-Benchmark_SeriesID_500-8                                                  
275554             42360 ns/op            1543 B/op          7 allocs/op
-Benchmark_SeriesID_1000-8                                                 
289639             39556 ns/op            1543 B/op          7 allocs/op
-Benchmark_SeriesID_1000_Buffer_64K-8                                      
282884             38827 ns/op            1543 B/op          7 allocs/op
-Benchmark_SeriesID_1000_Buffer_128K-8                                     
606891             20238 ns/op            1534 B/op          7 allocs/op
-Benchmark_SeriesID_1000_Buffer_512K-8                                    
1958060              5764 ns/op            1553 B/op          7 allocs/op
-Benchmark_SeriesID_1000_Buffer_1MB-8                                     
4478250              2738 ns/op            1522 B/op          6 allocs/op
-Benchmark_SeriesID_1000_Buffer_2MB-8                                     
7515986              1537 ns/op            1818 B/op          5 allocs/op
-Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush-8                         
7249369              1676 ns/op            1542 B/op          7 allocs/op
-Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush-8                        
7443198              1632 ns/op            1534 B/op          7 allocs/op
-Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush-8                        
7239253              1631 ns/op            1553 B/op          7 allocs/op
-Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush-8                         
8047040              1497 ns/op            1521 B/op          6 allocs/op
-Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush-8                         
7938543              1508 ns/op            1818 B/op          5 allocs/op
-Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB-8         
7526020              1610 ns/op            1542 B/op          7 allocs/op
-Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB-8         
7432317              1591 ns/op            1542 B/op          7 allocs/op
-Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB-8         
7764439              1568 ns/op            1542 B/op          7 allocs/op
-```
-
-*Concluded from the benchmark test:*
-- Turning off sync flush file performance is better.
-- Increasing the buffer size is good for improving throughput.
-
-### CPU profile
-
-open with `go tool pprof -http="127.0.0.1:8080" cpu_profile.out` and you will 
get the following report
-
-```text
-Flat   Flat%   Sum%    Cum     Cum%    Name    Inlined?
-81.19s 29.74%  29.74%  81.19s  29.74%  runtime.pthread_cond_signal     
-65.81s 24.11%  53.84%  65.81s  24.11%  runtime.pthread_cond_wait       
-49.78s 18.23%  72.08%  49.78s  18.23%  runtime.usleep  
-31.62s 11.58%  83.66%  31.62s  11.58%  syscall.syscall 
-7.06s  2.59%   86.25%  7.06s   2.59%   runtime.pthread_kill    
-6.74s  2.47%   88.71%  6.74s   2.47%   runtime.madvise 
-2.43s  0.89%   89.60%  2.76s   1.01%   github.com/golang/snappy.encodeBlock    
-2.11s  0.77%   90.38%  6.77s   2.48%   runtime.scanobject      
-1.40s  0.51%   90.89%  2.36s   0.86%   runtime.greyobject      
-1.32s  0.48%   91.37%  1.84s   0.67%   runtime.mapaccess1      
-1.03s  0.38%   91.75%  1.47s   0.54%   runtime.findObject      
-0.98s  0.36%   92.11%  38.51s  14.11%  runtime.stealWork       
-0.64s  0.23%   92.34%  2.43s   0.89%   runtime.mallocgc        
-0.61s  0.22%   92.57%  2.34s   0.86%   runtime.mapassign       
-0.33s  0.12%   92.69%  14.24s  5.22%   runtime.gcDrain 
-0.20s  0.07%   92.76%  13.41s  4.91%   runtime.lock2   
-0.20s  0.07%   92.84%  4.42s   1.62%   
github.com/apache/skywalking-banyandb/pkg/wal.(*buffer).write   (inline)
-0.12s  0.04%   92.88%  112.36s 41.16%  runtime.findRunnable    
-0.10s  0.04%   92.92%  36.86s  13.50%  runtime.runqgrab        
-0.09s  0.03%   92.95%  1.57s   0.58%   runtime.growslice       
-0.06s  0.02%   92.97%  36.91s  13.52%  
github.com/apache/skywalking-banyandb/pkg/wal.(*log).flushBuffer        
-0.06s  0.02%   92.99%  1.76s   0.64%   
github.com/apache/skywalking-banyandb/pkg/wal.(*buffer).notifyRequests  
-0.05s  0.02%   93.01%  106.31s 38.94%  runtime.systemstack     
-0.05s  0.02%   93.03%  6.56s   2.40%   runtime.(*mheap).allocSpan      
-0.05s  0.02%   93.05%  5.78s   2.12%   runtime.(*gcWork).balance       
-0.04s  0.01%   93.06%  8.32s   3.05%   runtime.gcBgMarkWorker  
-0.03s  0.01%   93.07%  36.89s  13.51%  runtime.runqsteal       
-0.02s  0.01%   93.08%  67.03s  24.55%  runtime.semasleep       
-0.02s  0.01%   93.09%  117.19s 42.93%  runtime.schedule        
-0.02s  0.01%   93.10%  6.49s   2.38%   runtime.preemptone      
-0.02s  0.01%   93.10%  2.80s   1.03%   github.com/golang/snappy.Encode 
-0.02s  0.01%   93.11%  7.13s   2.61%   
github.com/apache/skywalking-banyandb/pkg/wal.(*log).start.func1        
-0.01s  0.00%   93.11%  1.41s   0.52%   time.NewTimer   
-0.01s  0.00%   93.12%  83.77s  30.68%  runtime.wakep   
-0.01s  0.00%   93.12%  7.07s   2.59%   runtime.signalM (inline)
-0.01s  0.00%   93.12%  65.70s  24.07%  runtime.notesleep       
-0.01s  0.00%   93.13%  3.79s   1.39%   runtime.newstack        
-0.01s  0.00%   93.13%  78.99s  28.93%  runtime.goready.func1   
-0.01s  0.00%   93.14%  2.60s   0.95%   runtime.forEachP        
-0.01s  0.00%   93.14%  5.94s   2.18%   runtime.(*mheap).alloc.func1    
-0.01s  0.00%   93.14%  26.23s  9.61%   os.(*File).Write        
-0.01s  0.00%   93.15%  26.22s  9.60%   internal/poll.(*FD).Write       
-0.01s  0.00%   93.15%  31.28s  11.46%  
github.com/apache/skywalking-banyandb/pkg/wal.(*log).writeWorkSegment   
-0.01s  0.00%   93.15%  38.70s  14.18%  
github.com/apache/skywalking-banyandb/pkg/wal.(*log).start.func2        
-0.01s  0.00%   93.16%  2.81s   1.03%   
github.com/apache/skywalking-banyandb/pkg/wal.(*bufferWriter).WriteData 
-0      0.00%   93.16%  26.21s  9.60%   syscall.write   
-0      0.00%   93.16%  5.04s   1.85%   syscall.fcntl   
-0      0.00%   93.16%  26.21s  9.60%   syscall.Write   (inline)
-0      0.00%   93.16%  6.30s   2.31%   runtime.sysUsedOS       (inline)
-0      0.00%   93.16%  6.31s   2.31%   runtime.sysUsed (inline)
-0      0.00%   93.16%  66.66s  24.42%  runtime.stopm   
-0      0.00%   93.16%  80.05s  29.32%  runtime.startm  
-0      0.00%   93.16%  1.42s   0.52%   runtime.startTheWorldWithSema   
-0      0.00%   93.16%  81.29s  29.78%  runtime.semawakeup      
-0      0.00%   93.16%  4.51s   1.65%   runtime.resetspinning   
-0      0.00%   93.16%  78.98s  28.93%  runtime.ready   
-0      0.00%   93.16%  7.07s   2.59%   runtime.preemptM        
-0      0.00%   93.16%  115.92s 42.46%  runtime.park_m  
-0      0.00%   93.16%  13.03s  4.77%   runtime.osyield (inline)
-0      0.00%   93.16%  80.58s  29.52%  runtime.notewakeup      
-0      0.00%   93.16%  3.58s   1.31%   runtime.morestack       
-0      0.00%   93.16%  116.10s 42.53%  runtime.mcall   
-0      0.00%   93.16%  1.45s   0.53%   runtime.markroot        
-0      0.00%   93.16%  65.70s  24.07%  runtime.mPark   (inline)
-0      0.00%   93.16%  13.41s  4.91%   runtime.lockWithRank    (inline)
-0      0.00%   93.16%  13.41s  4.91%   runtime.lock    (inline)
-0      0.00%   93.16%  3.35s   1.23%   runtime.goschedImpl     
-0      0.00%   93.16%  3.32s   1.22%   runtime.gopreempt_m     
-0      0.00%   93.16%  1.49s   0.55%   runtime.gcMarkDone.func1        
-0      0.00%   93.16%  14.24s  5.22%   runtime.gcBgMarkWorker.func2    
-0      0.00%   93.16%  5.55s   2.03%   
runtime.(*gcControllerState).enlistWorker       
-0      0.00%   93.16%  26.22s  9.60%   os.(*File).write        (inline)
-0      0.00%   93.16%  5.04s   1.85%   os.(*File).Sync 
-0      0.00%   93.16%  26.21s  9.60%   internal/poll.ignoringEINTRIO   (inline)
-0      0.00%   93.16%  5.04s   1.85%   internal/poll.ignoringEINTR     (inline)
-0      0.00%   93.16%  5.04s   1.85%   internal/poll.(*FD).Fsync.func1 (inline)
-0      0.00%   93.16%  5.04s   1.85%   internal/poll.(*FD).Fsync
-```
-
-### Memory profile
-
-open with `go tool pprof -http="127.0.0.1:8080" mem_profile.out` and you will 
get the following report
-
-```text
-Flat           Flat%   Sum%    Cum             Cum%    Name
-117496.32MB    87.39%  87.39%  117496.32MB     87.39%  
github.com/apache/skywalking-banyandb/pkg/wal.(*buffer).write
-16789.80MB     12.49%  99.88%  16789.80MB      12.49%  time.NewTimer   
-2.50MB         0.00%   99.88%  134335.12MB     99.91%  
github.com/apache/skywalking-banyandb/pkg/wal.(*log).start.func1
-```
\ No newline at end of file
diff --git a/pkg/wal/wal.go b/pkg/wal/wal.go
deleted file mode 100644
index eb9d0db0..00000000
--- a/pkg/wal/wal.go
+++ /dev/null
@@ -1,1081 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package wal (Write-ahead logging) is an independent component to ensure 
data reliability.
-package wal
-
-import (
-       "bytes"
-       "container/list"
-       "encoding/binary"
-       "fmt"
-       "math"
-       "os"
-       "path/filepath"
-       "sort"
-       "strconv"
-       "strings"
-       "sync"
-       "syscall"
-       "time"
-
-       "github.com/golang/snappy"
-       "github.com/pkg/errors"
-       "go.uber.org/multierr"
-
-       "github.com/apache/skywalking-banyandb/pkg/convert"
-       "github.com/apache/skywalking-banyandb/pkg/encoding"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/run"
-)
-
-const (
-       moduleName             = "wal"
-       segmentNamePrefix      = "seg"
-       segmentNameSuffix      = ".wal"
-       batchLength            = 8
-       entryLength            = 8
-       seriesIDLength         = 2
-       seriesCountLength      = 4
-       timestampVolumeLength  = 8
-       timestampsBinaryLength = 2
-       valuesBinaryLength     = 2
-       parseTimeStr           = "2006-01-02 15:04:05"
-       maxRetries             = 3
-       maxSegmentID           = uint64(math.MaxUint64) - 1
-       defaultSyncFlush       = false
-)
-
-// DefaultOptions for Open().
-var DefaultOptions = &Options{
-       FileSize:            67108864, // 64MB
-       BufferSize:          65535,    // 16KB
-       BufferBatchInterval: 3 * time.Second,
-       SyncFlush:           defaultSyncFlush,
-}
-
-// Options for creating Write-ahead Logging.
-type Options struct {
-       FileSize            int
-       BufferSize          int
-       BufferBatchInterval time.Duration
-       FlushQueueSize      int
-       SyncFlush           bool
-}
-
-// WAL denotes a Write-ahead logging.
-// Modules who want their data reliable could write data to an instance of WAL.
-// A WAL combines several segments, ingesting data on a single opened one.
-// Rotating the WAL will create a new segment, marking it as opened and 
persisting previous segments on the disk.
-type WAL interface {
-       // Write a logging entity.
-       // It will return immediately when the data is written in the buffer,
-       // The callback function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID []byte, timestamp time.Time, data []byte, callback 
func([]byte, time.Time, []byte, error))
-       // Read specified segment by SegmentID.
-       Read(segmentID SegmentID) (Segment, error)
-       // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-       ReadAllSegments() ([]Segment, error)
-       // Rotate closes the open segment and opens a new one, returning the 
closed segment details.
-       Rotate() (Segment, error)
-       // Delete the specified segment.
-       Delete(segmentID SegmentID) error
-       // Close all of segments and stop WAL work.
-       Close() error
-}
-
-// SegmentID identities a segment in a WAL.
-type SegmentID uint64
-
-// Segment allows reading underlying segments that hold WAl entities.
-type Segment interface {
-       GetSegmentID() SegmentID
-       GetLogEntries() []LogEntry
-}
-
-// LogEntry used for attain detail value of WAL entry.
-type LogEntry interface {
-       GetSeriesID() []byte
-       GetTimestamps() []time.Time
-       GetValues() *list.List
-}
-
-// log implements the WAL interface.
-type log struct {
-       writeCloser     *run.ChannelCloser
-       flushCloser     *run.ChannelCloser
-       chanGroupCloser *run.ChannelGroupCloser
-       buffer          buffer
-       logger          *logger.Logger
-       bufferWriter    *bufferWriter
-       segmentMap      map[SegmentID]*segment
-       workSegment     *segment
-       writeChannel    chan logRequest
-       flushChannel    chan buffer
-       path            string
-       options         Options
-       rwMutex         sync.RWMutex
-       closerOnce      sync.Once
-}
-
-type segment struct {
-       file       *os.File
-       path       string
-       logEntries []LogEntry
-       segmentID  SegmentID
-}
-
-type logRequest struct {
-       seriesID  []byte
-       timestamp time.Time
-       callback  func([]byte, time.Time, []byte, error)
-       data      []byte
-}
-
-type logEntry struct {
-       timestamps  []time.Time
-       values      *list.List
-       seriesID    []byte
-       entryLength uint64
-       count       uint32
-}
-
-type buffer struct {
-       timestampMap map[logSeriesID][]time.Time
-       valueMap     map[logSeriesID][]byte
-       callbackMap  map[logSeriesID][]func([]byte, time.Time, []byte, error)
-       count        int
-}
-
-type bufferWriter struct {
-       buf           *bytes.Buffer
-       timestampsBuf *bytes.Buffer
-       seriesID      *logSeriesID
-       dataBuf       []byte
-       batchLen      uint64
-       seriesCount   uint32
-       dataLen       int
-}
-
-type logSeriesID struct {
-       key     string
-       byteLen int
-}
-
-func newLogSeriesID(b []byte) logSeriesID {
-       return logSeriesID{key: convert.BytesToString(b), byteLen: len(b)}
-}
-
-func (s logSeriesID) string() string {
-       return s.key
-}
-
-func (s logSeriesID) bytes() []byte {
-       return convert.StringToBytes(s.key)
-}
-
-func (s logSeriesID) len() int {
-       return s.byteLen
-}
-
-// New creates a WAL instance in the specified path.
-func New(path string, options *Options) (WAL, error) {
-       //  Check configuration options.
-       walOptions := DefaultOptions
-       if options != nil {
-               fileSize := options.FileSize
-               if fileSize <= 0 {
-                       fileSize = DefaultOptions.FileSize
-               }
-               bufferSize := options.BufferSize
-               if bufferSize <= 0 {
-                       bufferSize = DefaultOptions.BufferSize
-               }
-               bufferBatchInterval := options.BufferBatchInterval
-               if bufferBatchInterval <= 0 {
-                       bufferBatchInterval = DefaultOptions.BufferBatchInterval
-               }
-               walOptions = &Options{
-                       FileSize:            fileSize,
-                       BufferSize:          bufferSize,
-                       BufferBatchInterval: bufferBatchInterval,
-                       SyncFlush:           options.SyncFlush,
-               }
-       }
-
-       // Initial WAL path.
-       path, err := filepath.Abs(path)
-       if err != nil {
-               return nil, errors.Wrap(err, "Can not get absolute path: "+path)
-       }
-       if err := os.MkdirAll(path, os.ModePerm); err != nil {
-               return nil, err
-       }
-
-       writeCloser := run.NewChannelCloser()
-       flushCloser := run.NewChannelCloser()
-       chanGroupCloser := run.NewChannelGroupCloser(writeCloser, flushCloser)
-       log := &log{
-               path:            path,
-               options:         *walOptions,
-               logger:          logger.GetLogger(moduleName),
-               writeChannel:    make(chan logRequest),
-               flushChannel:    make(chan buffer, walOptions.FlushQueueSize),
-               bufferWriter:    newBufferWriter(),
-               writeCloser:     writeCloser,
-               flushCloser:     flushCloser,
-               chanGroupCloser: chanGroupCloser,
-               buffer: buffer{
-                       timestampMap: make(map[logSeriesID][]time.Time),
-                       valueMap:     make(map[logSeriesID][]byte),
-                       callbackMap:  make(map[logSeriesID][]func([]byte, 
time.Time, []byte, error)),
-                       count:        0,
-               },
-       }
-       if err := log.load(); err != nil {
-               return nil, err
-       }
-       log.start()
-
-       log.logger.Info().Str("path", path).Msg("WAL has been initialized")
-       return log, nil
-}
-
-// Write a logging entity.
-// It will return immediately when the data is written in the buffer,
-// The callback function will be called when the entity is flushed on the 
persistent storage.
-func (log *log) Write(seriesID []byte, timestamp time.Time, data []byte, 
callback func([]byte, time.Time, []byte, error)) {
-       if !log.writeCloser.AddSender() {
-               return
-       }
-       defer log.writeCloser.SenderDone()
-
-       log.writeChannel <- logRequest{
-               seriesID:  seriesID,
-               timestamp: timestamp,
-               data:      data,
-               callback:  callback,
-       }
-}
-
-// Read specified segment by SegmentID.
-func (log *log) Read(segmentID SegmentID) (Segment, error) {
-       log.rwMutex.RLock()
-       defer log.rwMutex.RUnlock()
-
-       segment := log.segmentMap[segmentID]
-       return segment, nil
-}
-
-// ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-func (log *log) ReadAllSegments() ([]Segment, error) {
-       log.rwMutex.RLock()
-       defer log.rwMutex.RUnlock()
-
-       segments := make([]Segment, 0)
-       for _, segment := range log.segmentMap {
-               segments = append(segments, segment)
-       }
-       sort.Slice(segments, func(i, j int) bool { return 
segments[i].GetSegmentID() < segments[j].GetSegmentID() })
-       return segments, nil
-}
-
-// Rotate closes the open segment and opens a new one, returning the closed 
segment details.
-func (log *log) Rotate() (Segment, error) {
-       log.rwMutex.Lock()
-       defer log.rwMutex.Unlock()
-
-       newSegmentID := uint64(log.workSegment.segmentID) + 1
-       if newSegmentID > maxSegmentID {
-               return nil, errors.New("Segment ID overflow uint64," +
-                       " please delete all WAL segment files and restart")
-       }
-       if err := log.workSegment.file.Close(); err != nil {
-               return nil, errors.Wrap(err, "Close WAL segment error")
-       }
-
-       // Create new segment.
-       oldWorkSegment := log.workSegment
-       segment := &segment{
-               segmentID: SegmentID(newSegmentID),
-               path:      filepath.Join(log.path, segmentName(newSegmentID)),
-       }
-       if err := segment.openFile(true); err != nil {
-               return nil, errors.Wrap(err, "Open WAL segment error")
-       }
-       log.workSegment = segment
-
-       // Update segment information.
-       log.segmentMap[log.workSegment.segmentID] = log.workSegment
-       return oldWorkSegment, nil
-}
-
-// Delete the specified segment.
-func (log *log) Delete(segmentID SegmentID) error {
-       log.rwMutex.Lock()
-       defer log.rwMutex.Unlock()
-
-       // Segment which will be deleted must be closed.
-       if segmentID == log.workSegment.segmentID {
-               return errors.New("Can not delete the segment which is working")
-       }
-       defer delete(log.segmentMap, segmentID)
-       err := os.Remove(log.segmentMap[segmentID].path)
-       if err == nil {
-               return nil
-       }
-       var pathErr *os.PathError
-       if errors.As(err, &pathErr) && errors.Is(pathErr.Err, syscall.ENOENT) {
-               return nil
-       }
-       return errors.Wrap(err, "Delete WAL segment error")
-}
-
-// Close all of segments and stop WAL work.
-func (log *log) Close() error {
-       var globalErr error
-       log.closerOnce.Do(func() {
-               log.logger.Info().Msg("Closing WAL...")
-
-               log.chanGroupCloser.CloseThenWait()
-
-               if err := log.flushBuffer(log.buffer); err != nil {
-                       globalErr = multierr.Append(globalErr, err)
-               }
-               if err := log.workSegment.file.Close(); err != nil {
-                       globalErr = multierr.Append(globalErr, err)
-               }
-               log.logger.Info().Msg("Closed WAL")
-       })
-       return globalErr
-}
-
-func (log *log) start() {
-       var initialTasks sync.WaitGroup
-       initialTasks.Add(2)
-
-       go func() {
-               if !log.writeCloser.AddReceiver() {
-                       panic("writeCloser already closed")
-               }
-               defer log.writeCloser.ReceiverDone()
-
-               log.logger.Info().Msg("Start batch task...")
-               initialTasks.Done()
-
-               bufferVolume := 0
-               for {
-                       timer := time.NewTimer(log.options.BufferBatchInterval)
-                       select {
-                       case request, chOpen := <-log.writeChannel:
-                               if !chOpen {
-                                       timer.Stop()
-                                       log.logger.Info().Msg("Stop batch task 
when write-channel closed")
-                                       return
-                               }
-
-                               log.buffer.write(request)
-                               if log.logger.Debug().Enabled() {
-                                       log.logger.Debug().Msg("Write request 
to buffer. elements: " + strconv.Itoa(log.buffer.count))
-                               }
-
-                               bufferVolume += len(request.seriesID) + 
timestampVolumeLength + len(request.data)
-                               if bufferVolume > log.options.BufferSize {
-                                       log.triggerFlushing()
-                                       bufferVolume = 0
-                               }
-                       case <-timer.C:
-                               if bufferVolume == 0 {
-                                       continue
-                               }
-                               log.triggerFlushing()
-                               bufferVolume = 0
-                       case <-log.writeCloser.CloseNotify():
-                               timer.Stop()
-                               log.logger.Info().Msg("Stop batch task when 
close notify")
-                               return
-                       }
-                       timer.Stop()
-               }
-       }()
-
-       go func() {
-               if !log.flushCloser.AddReceiver() {
-                       panic("flushCloser already closed")
-               }
-               defer log.flushCloser.ReceiverDone()
-
-               log.logger.Info().Msg("Start flush task...")
-               initialTasks.Done()
-
-               for {
-                       select {
-                       case batch, chOpen := <-log.flushChannel:
-                               if !chOpen {
-                                       log.logger.Info().Msg("Stop flush task 
when flush-channel closed")
-                                       return
-                               }
-
-                               startTime := time.Now()
-                               var err error
-                               for i := 0; i < maxRetries; i++ {
-                                       if err = log.flushBuffer(batch); err != 
nil {
-                                               
log.logger.Err(err).Msg("Flushing buffer failed. Retrying...")
-                                               time.Sleep(time.Second)
-                                               continue
-                                       }
-                                       break
-                               }
-                               if log.logger.Debug().Enabled() {
-                                       log.logger.Debug().Msg("Flushed buffer 
to WAL file. elements: " +
-                                               strconv.Itoa(batch.count) + ", 
cost: " + time.Since(startTime).String())
-                               }
-
-                               batch.notifyRequests(err)
-                       case <-log.flushCloser.CloseNotify():
-                               log.logger.Info().Msg("Stop flush task when 
close notify")
-                               return
-                       }
-               }
-       }()
-
-       initialTasks.Wait()
-       log.logger.Info().Msg("Started WAL")
-}
-
-func (log *log) triggerFlushing() {
-       log.flushChannel <- log.buffer
-       if log.logger.Debug().Enabled() {
-               log.logger.Debug().Msg("Send buffer to flush-channel. elements: 
" + strconv.Itoa(log.buffer.count))
-       }
-
-       log.newBuffer()
-}
-
-func (log *log) newBuffer() {
-       log.buffer = buffer{
-               timestampMap: make(map[logSeriesID][]time.Time),
-               valueMap:     make(map[logSeriesID][]byte),
-               callbackMap:  make(map[logSeriesID][]func([]byte, time.Time, 
[]byte, error)),
-               count:        0,
-       }
-}
-
-func (log *log) flushBuffer(buffer buffer) error {
-       if buffer.count == 0 {
-               return nil
-       }
-
-       var err error
-       if err = log.bufferWriter.Reset(); err != nil {
-               return errors.Wrap(err, "Reset buffer writer error")
-       }
-
-       for seriesID, timestamps := range buffer.timestampMap {
-               log.bufferWriter.ResetSeries()
-
-               if err = log.bufferWriter.WriteSeriesID(seriesID); err != nil {
-                       return errors.Wrap(err, "Write seriesID error")
-               }
-               log.bufferWriter.WriteTimestamps(timestamps)
-               log.bufferWriter.WriteData(buffer.valueMap[seriesID])
-               if err = log.bufferWriter.AddSeries(); err != nil {
-                       return errors.Wrap(err, "Add series error")
-               }
-       }
-
-       return log.writeWorkSegment(log.bufferWriter.Bytes())
-}
-
-func (log *log) writeWorkSegment(data []byte) error {
-       log.rwMutex.RLock()
-       defer log.rwMutex.RUnlock()
-
-       // Write batch data to WAL segment file
-       if _, err := log.workSegment.file.Write(data); err != nil {
-               return errors.Wrap(err, "Write WAL segment file error, file: 
"+log.workSegment.path)
-       }
-       if log.options.SyncFlush {
-               if err := log.workSegment.file.Sync(); err != nil {
-                       log.logger.Warn().Msg("Sync WAL segment file to disk 
failed, file: " + log.workSegment.path)
-               }
-       }
-       return nil
-}
-
-func (log *log) load() error {
-       files, err := os.ReadDir(log.path)
-       if err != nil {
-               return errors.Wrap(err, "Can not read dir: "+log.path)
-       }
-       // Load all of WAL segments.
-       var workSegmentID SegmentID
-       log.segmentMap = make(map[SegmentID]*segment)
-       for _, file := range files {
-               name := file.Name()
-               segmentID, parsePathErr := parseSegmentID(name)
-               if parsePathErr != nil {
-                       return errors.Wrap(parsePathErr, "Parse file name 
error, name: "+name)
-               }
-               if segmentID > uint64(workSegmentID) {
-                       workSegmentID = SegmentID(segmentID)
-               }
-               segment := &segment{
-                       segmentID: SegmentID(segmentID),
-                       path:      filepath.Join(log.path, 
segmentName(segmentID)),
-               }
-               if err = segment.parseLogEntries(); err != nil {
-                       return errors.Wrap(err, "Fail to parse log entries")
-               }
-               log.segmentMap[SegmentID(segmentID)] = segment
-
-               if log.logger.Debug().Enabled() {
-                       log.logger.Debug().Msg("Loaded segment file: " + 
segment.path)
-               }
-       }
-
-       // If load first time.
-       if len(log.segmentMap) == 0 {
-               segmentID := SegmentID(1)
-               segment := &segment{
-                       segmentID: segmentID,
-                       path:      filepath.Join(log.path, 
segmentName(uint64(segmentID))),
-               }
-               log.segmentMap[segmentID] = segment
-               log.workSegment = segment
-       } else {
-               log.workSegment = log.segmentMap[workSegmentID]
-       }
-       if err = log.workSegment.openFile(false); err != nil {
-               return errors.Wrap(err, "Open WAL segment error, file: 
"+log.workSegment.path)
-       }
-       return nil
-}
-
-func newBufferWriter() *bufferWriter {
-       return &bufferWriter{
-               buf:           bytes.NewBuffer([]byte{}),
-               timestampsBuf: bytes.NewBuffer([]byte{}),
-               dataBuf:       make([]byte, 128),
-       }
-}
-
-func (w *bufferWriter) Reset() error {
-       w.ResetSeries()
-       w.buf.Reset()
-       w.batchLen = 0
-
-       // pre-placement padding
-       err := w.writeBatchLength(0)
-       return err
-}
-
-func (w *bufferWriter) ResetSeries() {
-       w.timestampsBuf.Reset()
-       w.dataLen = 0
-       w.seriesID = nil
-       w.seriesCount = 0
-}
-
-func (w *bufferWriter) AddSeries() error {
-       seriesIDBytesLen := uint16(w.seriesID.len())
-       timestampsBytesLen := uint16(w.timestampsBuf.Len())
-       entryLen := seriesIDLength + uint64(seriesIDBytesLen) + 
seriesCountLength + timestampsBinaryLength + uint64(timestampsBytesLen) + 
uint64(w.dataLen)
-
-       var err error
-       if err = w.writeEntryLength(entryLen); err != nil {
-               return err
-       }
-       if err = w.writeSeriesIDLength(seriesIDBytesLen); err != nil {
-               return err
-       }
-       if err = w.writeSeriesID(w.seriesID); err != nil {
-               return err
-       }
-       if err = w.writeSeriesCount(w.seriesCount); err != nil {
-               return err
-       }
-       if err = w.writeTimestampsLength(timestampsBytesLen); err != nil {
-               return err
-       }
-       if err = w.writeTimestamps(w.timestampsBuf.Bytes()); err != nil {
-               return err
-       }
-       if err = w.writeData(w.dataBuf[:w.dataLen]); err != nil {
-               return err
-       }
-       w.batchLen += entryLen
-       return nil
-}
-
-func (w *bufferWriter) Bytes() []byte {
-       batchBytes := w.buf.Bytes()
-       batchLen := uint64(len(batchBytes)) - batchLength
-       return w.rewriteBatchLength(batchBytes, batchLen)
-}
-
-func (w *bufferWriter) WriteSeriesID(seriesID logSeriesID) error {
-       w.seriesID = &seriesID
-       return nil
-}
-
-func (w *bufferWriter) WriteTimestamps(timestamps []time.Time) {
-       timestampWriter := encoding.NewWriter()
-       timestampEncoder := encoding.NewXOREncoder(timestampWriter)
-       timestampWriter.Reset(w.timestampsBuf)
-       for _, timestamp := range timestamps {
-               timestampEncoder.Write(timeToUnixNano(timestamp))
-       }
-       timestampWriter.Flush()
-       w.seriesCount = uint32(len(timestamps))
-}
-
-func (w *bufferWriter) WriteData(data []byte) {
-       maxEncodedLen := snappy.MaxEncodedLen(len(data))
-       dataBufLen := len(w.dataBuf)
-       if dataBufLen < maxEncodedLen {
-               newCapacity := (dataBufLen * 2) - (dataBufLen / 2)
-               if newCapacity < maxEncodedLen {
-                       newCapacity = maxEncodedLen
-               }
-               w.dataBuf = make([]byte, newCapacity)
-       }
-       snappyData := snappy.Encode(w.dataBuf, data)
-       w.dataLen = len(snappyData)
-}
-
-func (w *bufferWriter) writeBatchLength(data uint64) error {
-       return writeUint64(w.buf, data)
-}
-
-func (w *bufferWriter) rewriteBatchLength(b []byte, batchLen uint64) []byte {
-       _ = b[7] // early bounds check to guarantee safety of writes below
-       b[0] = byte(batchLen)
-       b[1] = byte(batchLen >> 8)
-       b[2] = byte(batchLen >> 16)
-       b[3] = byte(batchLen >> 24)
-       b[4] = byte(batchLen >> 32)
-       b[5] = byte(batchLen >> 40)
-       b[6] = byte(batchLen >> 48)
-       b[7] = byte(batchLen >> 56)
-       return b
-}
-
-func (w *bufferWriter) writeEntryLength(data uint64) error {
-       return writeUint64(w.buf, data)
-}
-
-func (w *bufferWriter) writeSeriesIDLength(data uint16) error {
-       return writeUint16(w.buf, data)
-}
-
-func (w *bufferWriter) writeSeriesID(data *logSeriesID) error {
-       _, err := w.buf.WriteString(data.string())
-       return err
-}
-
-func (w *bufferWriter) writeSeriesCount(data uint32) error {
-       return writeUint32(w.buf, data)
-}
-
-func (w *bufferWriter) writeTimestampsLength(data uint16) error {
-       return writeUint16(w.buf, data)
-}
-
-func (w *bufferWriter) writeTimestamps(data []byte) error {
-       _, err := w.buf.Write(data)
-       return err
-}
-
-func (w *bufferWriter) writeData(data []byte) error {
-       _, err := w.buf.Write(data)
-       return err
-}
-
-func (segment *segment) GetSegmentID() SegmentID {
-       return segment.segmentID
-}
-
-func (segment *segment) GetLogEntries() []LogEntry {
-       return segment.logEntries
-}
-
-func (segment *segment) openFile(overwrite bool) error {
-       var err error
-       if overwrite {
-               segment.file, err = os.OpenFile(segment.path, 
os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm)
-       } else {
-               segment.file, err = os.OpenFile(segment.path, 
os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModePerm)
-       }
-       return err
-}
-
-func (segment *segment) parseLogEntries() error {
-       segmentBytes, err := os.ReadFile(segment.path)
-       if err != nil {
-               return errors.Wrap(err, "Read WAL segment failed, path: 
"+segment.path)
-       }
-
-       var logEntries []LogEntry
-       var data []byte
-       var batchLen uint64
-       var entryLen uint64
-       var seriesIDLen uint16
-       var seriesID []byte
-       var seriesCount uint32
-       var timestampsBinaryLen uint16
-       var entryEndPosition uint64
-
-       var oldPos uint64
-       var pos uint64
-       parseNextBatchFlag := true
-       segmentBytesLen := uint64(len(segmentBytes))
-
-       for {
-               if parseNextBatchFlag {
-                       if segmentBytesLen <= batchLength {
-                               break
-                       }
-                       data = segmentBytes[pos : pos+batchLength]
-                       batchLen, err = segment.parseBatchLength(data)
-                       if err != nil {
-                               return errors.Wrap(err, "Parse batch length 
error")
-                       }
-
-                       if segmentBytesLen <= batchLen {
-                               break
-                       }
-
-                       pos += batchLength
-                       oldPos = pos
-                       parseNextBatchFlag = false
-               }
-
-               // Parse entryLength.
-               data = segmentBytes[pos : pos+entryLength]
-
-               entryLen, err = segment.parseEntryLength(data)
-               if err != nil {
-                       return errors.Wrap(err, "Parse entry length error")
-               }
-               pos += entryLength
-
-               // Mark entry end-position
-               entryEndPosition = pos + entryLen
-               if segmentBytesLen < entryEndPosition {
-                       break
-               }
-
-               // Parse seriesIDLength.
-               data = segmentBytes[pos : pos+seriesIDLength]
-               seriesIDLen, err = segment.parseSeriesIDLength(data)
-               if err != nil {
-                       return errors.Wrap(err, "Parse seriesID length error")
-               }
-               pos += seriesIDLength
-
-               // Parse seriesID.
-               data = segmentBytes[pos : pos+uint64(seriesIDLen)]
-               seriesID = segment.parseSeriesID(data)
-               pos += uint64(seriesIDLen)
-
-               // Parse series count.
-               data = segmentBytes[pos : pos+seriesCountLength]
-               seriesCount, err = segment.parseSeriesCountLength(data)
-               if err != nil {
-                       return errors.Wrap(err, "Parse series count length 
error")
-               }
-               pos += seriesCountLength
-
-               // Parse timestamps compression binary.
-               data = segmentBytes[pos : pos+timestampsBinaryLength]
-               timestampsBinaryLen, err = segment.parseTimestampsLength(data)
-               if err != nil {
-                       return errors.Wrap(err, "Parse timestamps length error")
-               }
-               pos += timestampsBinaryLength
-               data = segmentBytes[pos : pos+uint64(timestampsBinaryLen)]
-               var timestamps []time.Time
-               timestamps, err = segment.parseTimestamps(seriesCount, data)
-               if err != nil {
-                       return errors.Wrap(err, "Parse timestamps compression 
binary error")
-               }
-               pos += uint64(timestampsBinaryLen)
-
-               // Parse values compression binary.
-               data = segmentBytes[pos:entryEndPosition]
-               values, err := segment.parseValuesBinary(data)
-               if err != nil {
-                       return errors.Wrap(err, "Parse values compression 
binary error")
-               }
-               if values.Len() != int(seriesCount) {
-                       return errors.New("values binary items not match series 
count. series count: " +
-                               strconv.Itoa(int(seriesCount)) + ", values 
binary items: " + strconv.Itoa(values.Len()))
-               }
-               pos = entryEndPosition
-
-               logEntry := &logEntry{
-                       entryLength: entryLen,
-                       seriesID:    seriesID,
-                       count:       seriesCount,
-                       timestamps:  timestamps,
-                       values:      values,
-               }
-               logEntries = append(logEntries, logEntry)
-
-               if pos == segmentBytesLen {
-                       break
-               }
-               if pos-oldPos == batchLen {
-                       parseNextBatchFlag = true
-               }
-       }
-       segment.logEntries = logEntries
-       return nil
-}
-
-func (segment *segment) parseBatchLength(data []byte) (uint64, error) {
-       var batchLen uint64
-       buf := bytes.NewBuffer(data)
-       if err := binary.Read(buf, binary.LittleEndian, &batchLen); err != nil {
-               return 0, err
-       }
-       return batchLen, nil
-}
-
-func (segment *segment) parseEntryLength(data []byte) (uint64, error) {
-       var entryLen uint64
-       buf := bytes.NewBuffer(data)
-       if err := binary.Read(buf, binary.LittleEndian, &entryLen); err != nil {
-               return 0, err
-       }
-       return entryLen, nil
-}
-
-func (segment *segment) parseSeriesIDLength(data []byte) (uint16, error) {
-       var seriesIDLen uint16
-       buf := bytes.NewBuffer(data)
-       if err := binary.Read(buf, binary.LittleEndian, &seriesIDLen); err != 
nil {
-               return 0, err
-       }
-       return seriesIDLen, nil
-}
-
-func (segment *segment) parseSeriesID(data []byte) []byte {
-       return newLogSeriesID(data).bytes()
-}
-
-func (segment *segment) parseSeriesCountLength(data []byte) (uint32, error) {
-       var seriesCount uint32
-       buf := bytes.NewBuffer(data)
-       if err := binary.Read(buf, binary.LittleEndian, &seriesCount); err != 
nil {
-               return 0, err
-       }
-       return seriesCount, nil
-}
-
-func (segment *segment) parseTimestampsLength(data []byte) (uint16, error) {
-       var timestampsLen uint16
-       buf := bytes.NewBuffer(data)
-       if err := binary.Read(buf, binary.LittleEndian, &timestampsLen); err != 
nil {
-               return 0, err
-       }
-       return timestampsLen, nil
-}
-
-func (segment *segment) parseTimestamps(seriesCount uint32, data []byte) 
([]time.Time, error) {
-       timestampReader := encoding.NewReader(bytes.NewReader(data))
-       timestampDecoder := encoding.NewXORDecoder(timestampReader)
-       var timestamps []time.Time
-       for i := 0; i < int(seriesCount); i++ {
-               if !timestampDecoder.Next() {
-                       return nil, errors.New("Timestamps length not match 
series count")
-               }
-               timestamps = append(timestamps, 
unixNanoToTime(timestampDecoder.Value()))
-       }
-       return timestamps, nil
-}
-
-func (segment *segment) parseValuesBinary(data []byte) (*list.List, error) {
-       var err error
-       if data, err = snappy.Decode(nil, data); err != nil {
-               return nil, errors.Wrap(err, "Decode values compression binary 
error")
-       }
-
-       values := list.New()
-       position := 0
-       for {
-               nextPosition, value := readValuesBinary(data, position, 
valuesBinaryLength)
-               if value == nil {
-                       break
-               }
-               values.PushBack(value)
-               position = nextPosition
-       }
-       return values, nil
-}
-
-func (logEntry *logEntry) GetSeriesID() []byte {
-       return logEntry.seriesID
-}
-
-func (logEntry *logEntry) GetTimestamps() []time.Time {
-       return logEntry.timestamps
-}
-
-func (logEntry *logEntry) GetValues() *list.List {
-       return logEntry.values
-}
-
-func (buffer *buffer) write(request logRequest) {
-       key := newLogSeriesID(request.seriesID)
-       buffer.timestampMap[key] = append(buffer.timestampMap[key], 
request.timestamp)
-
-       // Value item: binary-length(2-bytes) + binary data(n-bytes)
-       binaryLen := uint16(len(request.data))
-       buffer.valueMap[key] = append(buffer.valueMap[key], byte(binaryLen), 
byte(binaryLen>>8))
-       buffer.valueMap[key] = append(buffer.valueMap[key], request.data...)
-
-       buffer.callbackMap[key] = append(buffer.callbackMap[key], 
request.callback)
-       buffer.count++
-}
-
-func (buffer *buffer) notifyRequests(err error) {
-       var timestamps []time.Time
-       var values []byte
-       var valueItem []byte
-       var valuePos int
-       for seriesID, callbacks := range buffer.callbackMap {
-               timestamps = buffer.timestampMap[seriesID]
-               values = buffer.valueMap[seriesID]
-               valuePos = 0
-               for index, callback := range callbacks {
-                       valuePos, valueItem = readValuesBinary(values, 
valuePos, valuesBinaryLength)
-                       if callback != nil {
-                               buffer.runningCallback(func() {
-                                       callback(seriesID.bytes(), 
timestamps[index], valueItem, err)
-                               })
-                       }
-               }
-       }
-}
-
-func (buffer *buffer) runningCallback(callback func()) {
-       defer func() {
-               _ = recover()
-       }()
-       callback()
-}
-
-func segmentName(segmentID uint64) string {
-       return fmt.Sprintf("%v%016x%v", segmentNamePrefix, segmentID, 
segmentNameSuffix)
-}
-
-// Parse segment ID. segmentName example: seg0000000000000001.wal.
-func parseSegmentID(segmentName string) (uint64, error) {
-       _ = segmentName[22:] // early bounds check to guarantee safety of reads 
below
-       if !strings.HasPrefix(segmentName, segmentNamePrefix) {
-               return 0, errors.New("Invalid segment name: " + segmentName)
-       }
-       if !strings.HasSuffix(segmentName, segmentNameSuffix) {
-               return 0, errors.New("Invalid segment name: " + segmentName)
-       }
-       return strconv.ParseUint(segmentName[3:19], 16, 64)
-}
-
-func readValuesBinary(raw []byte, position int, offsetLen int) (int, []byte) {
-       if position == len(raw) {
-               return position, nil
-       }
-
-       data := raw[position : position+offsetLen]
-       binaryLen := bytesToUint16(data)
-       position += offsetLen
-
-       data = raw[position : position+int(binaryLen)]
-       position += int(binaryLen)
-       return position, data
-}
-
-func writeUint16(buffer *bytes.Buffer, data uint16) error {
-       var err error
-       if err = buffer.WriteByte(byte(data)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 8)); err != nil {
-               return err
-       }
-       return err
-}
-
-func writeUint32(buffer *bytes.Buffer, data uint32) error {
-       var err error
-       if err = buffer.WriteByte(byte(data)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 8)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 16)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 24)); err != nil {
-               return err
-       }
-       return err
-}
-
-func writeUint64(buffer *bytes.Buffer, data uint64) error {
-       var err error
-       if err = buffer.WriteByte(byte(data)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 8)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 16)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 24)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 32)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 40)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 48)); err != nil {
-               return err
-       }
-       if err = buffer.WriteByte(byte(data >> 56)); err != nil {
-               return err
-       }
-       return err
-}
-
-func bytesToUint16(buf []byte) uint16 {
-       return binary.LittleEndian.Uint16(buf)
-}
-
-func timeToUnixNano(time time.Time) uint64 {
-       return uint64(time.UnixNano())
-}
-
-func unixNanoToTime(unixNano uint64) time.Time {
-       return time.Unix(0, int64(unixNano))
-}
diff --git a/pkg/wal/wal_benchmark_test.go b/pkg/wal/wal_benchmark_test.go
deleted file mode 100644
index ba9570f2..00000000
--- a/pkg/wal/wal_benchmark_test.go
+++ /dev/null
@@ -1,450 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package wal (Write-ahead logging) is an independent component to ensure 
data reliability.
-package wal
-
-import (
-       "crypto/rand"
-       "fmt"
-       "math/big"
-       "os"
-       "path/filepath"
-       "testing"
-       "time"
-
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-var (
-       path         = "benchmark"
-       baseTime     = time.Now().UnixMilli()
-       data         = newBinaryData()
-       dataLen      = len(data)
-       seriesID1    = newSeriesIDList(1)
-       seriesID20   = newSeriesIDList(20)
-       seriesID100  = newSeriesIDList(100)
-       seriesID500  = newSeriesIDList(500)
-       seriesID1000 = newSeriesIDList(1000)
-       callback     = func(seriesID []byte, t time.Time, bytes []byte, err 
error) {}
-)
-
-func Benchmark_SeriesID_1(b *testing.B) {
-       wal := newWAL(&Options{SyncFlush: true})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_20(b *testing.B) {
-       wal := newWAL(&Options{SyncFlush: true})
-       defer closeWAL(wal)
-
-       seriesID := seriesID20
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_100(b *testing.B) {
-       wal := newWAL(&Options{SyncFlush: true})
-       defer closeWAL(wal)
-
-       seriesID := seriesID100
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_500(b *testing.B) {
-       wal := newWAL(&Options{SyncFlush: true})
-       defer closeWAL(wal)
-
-       seriesID := seriesID500
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000(b *testing.B) {
-       wal := newWAL(&Options{SyncFlush: true})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_64K(b *testing.B) {
-       wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 64})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_128K(b *testing.B) {
-       wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 128})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_512K(b *testing.B) {
-       wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 512})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_1MB(b *testing.B) {
-       wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 1024})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_2MB(b *testing.B) {
-       wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 1024 * 2})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 128, SyncFlush: false})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 512, SyncFlush: false})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 1024, SyncFlush: false})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2, SyncFlush: false})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b 
*testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       rotateSize := 1024 * 1024 * 16 // 16MB
-       rotateChan := make(chan struct{})
-       rotateMessage := struct{}{}
-       seriesIDVolume := 16
-       timeVolume := 8
-       var logVolume int
-       var binaryData []byte
-
-       b.ResetTimer()
-       go func() {
-               for range rotateChan {
-                       segment, err := wal.Rotate()
-                       if err != nil {
-                               panic(err)
-                       }
-                       wal.Delete(segment.GetSegmentID())
-               }
-       }()
-       for i := 0; i < b.N; i++ {
-               binaryData = data[i%dataLen].binary
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), binaryData, callback)
-
-               logVolume += seriesIDVolume + timeVolume + len(binaryData)
-               if logVolume >= rotateSize {
-                       rotateChan <- rotateMessage
-                       logVolume = 0
-               }
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b 
*testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       rotateSize := 1024 * 1024 * 32 // 32MB
-       rotateChan := make(chan struct{})
-       rotateMessage := struct{}{}
-       seriesIDVolume := 16
-       timeVolume := 8
-       var logVolume int
-       var binaryData []byte
-
-       b.ResetTimer()
-       go func() {
-               for range rotateChan {
-                       segment, err := wal.Rotate()
-                       if err != nil {
-                               panic(err)
-                       }
-                       wal.Delete(segment.GetSegmentID())
-               }
-       }()
-       for i := 0; i < b.N; i++ {
-               binaryData = data[i%dataLen].binary
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), binaryData, callback)
-
-               logVolume += seriesIDVolume + timeVolume + len(binaryData)
-               if logVolume >= rotateSize {
-                       rotateChan <- rotateMessage
-                       logVolume = 0
-               }
-       }
-       b.StopTimer()
-}
-
-func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB(b 
*testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
-       defer closeWAL(wal)
-
-       seriesID := seriesID1000
-       seriesIDLen := len(seriesID)
-
-       rotateSize := 1024 * 1024 * 64 // 64MB
-       rotateChan := make(chan struct{})
-       rotateMessage := struct{}{}
-       seriesIDVolume := 16
-       timeVolume := 8
-       var logVolume int
-       var binaryData []byte
-
-       b.ResetTimer()
-       go func() {
-               for range rotateChan {
-                       segment, err := wal.Rotate()
-                       if err != nil {
-                               panic(err)
-                       }
-                       wal.Delete(segment.GetSegmentID())
-               }
-       }()
-       for i := 0; i < b.N; i++ {
-               binaryData = data[i%dataLen].binary
-               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), binaryData, callback)
-
-               logVolume += seriesIDVolume + timeVolume + len(binaryData)
-               if logVolume >= rotateSize {
-                       rotateChan <- rotateMessage
-                       logVolume = 0
-               }
-       }
-       b.StopTimer()
-}
-
-func newWAL(options *Options) WAL {
-       os.RemoveAll(path)
-
-       logger.Init(logger.Logging{Level: "error"})
-       logPath, _ := filepath.Abs(path)
-       if options == nil {
-               options = &Options{
-                       BufferSize:          1024 * 64, // 64KB
-                       BufferBatchInterval: 3 * time.Second,
-               }
-       }
-       wal, err := New(logPath, options)
-       if err != nil {
-               panic(err)
-       }
-       return wal
-}
-
-func closeWAL(wal WAL) {
-       err := wal.Close()
-       if err != nil {
-               panic(err)
-       }
-
-       err = os.RemoveAll(path)
-       if err != nil {
-               panic(err)
-       }
-}
-
-type SeriesID struct {
-       key []byte
-}
-
-func newSeriesIDList(series int) []SeriesID {
-       var seriesIDSet []SeriesID
-       for i := 0; i < series; i++ {
-               seriesID := SeriesID{key: []byte(fmt.Sprintf("series-%d", i))}
-               seriesIDSet = append(seriesIDSet, seriesID)
-       }
-       return seriesIDSet
-}
-
-type Data struct {
-       binary []byte
-}
-
-func newBinaryData() []Data {
-       var data []Data
-       for i := 0; i < 2000; i++ {
-               data = append(data, Data{binary: []byte(randStr())})
-       }
-       return data
-}
-
-func randStr() string {
-       bytes := 
[]byte("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"',:{}[]")
-       result := []byte{}
-
-       var err error
-       var strLengthLower int64 = 200
-       var strLengthUpper int64 = 1024
-       var strLength *big.Int
-       var strRandIndex *big.Int
-       strLength, err = rand.Int(rand.Reader, big.NewInt(strLengthUpper))
-       if err != nil {
-               panic(err)
-       }
-       if strLength.Int64() < strLengthLower {
-               strLength = big.NewInt(strLengthLower)
-       }
-       for i := 0; i < int(strLength.Int64()); i++ {
-               strRandIndex, err = rand.Int(rand.Reader, 
big.NewInt(int64(len(bytes))))
-               if err != nil {
-                       panic(err)
-               }
-
-               result = append(result, bytes[strRandIndex.Int64()])
-       }
-       return string(result)
-}
diff --git a/pkg/wal/wal_test.go b/pkg/wal/wal_test.go
deleted file mode 100644
index d807f970..00000000
--- a/pkg/wal/wal_test.go
+++ /dev/null
@@ -1,242 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package version can be used to implement embedding versioning details from
-// git branches and tags into the binary importing this package.
-package wal_test
-
-import (
-       "bytes"
-       "fmt"
-       "os"
-       "path/filepath"
-       "sync"
-       "time"
-
-       "github.com/onsi/ginkgo/v2"
-       "github.com/onsi/gomega"
-       "github.com/onsi/gomega/gleak"
-
-       "github.com/apache/skywalking-banyandb/pkg/test/flags"
-       "github.com/apache/skywalking-banyandb/pkg/wal"
-)
-
-var _ = ginkgo.Describe("WAL", func() {
-       var (
-               path    string
-               log     wal.WAL
-               options *wal.Options
-               goods   []gleak.Goroutine
-       )
-       ginkgo.BeforeEach(func() {
-               options = &wal.Options{
-                       BufferSize:          1024, // 1KB
-                       BufferBatchInterval: 1 * time.Second,
-               }
-               goods = gleak.Goroutines()
-       })
-       ginkgo.AfterEach(func() {
-               err := os.RemoveAll(path)
-               gomega.Expect(err).ToNot(gomega.HaveOccurred())
-               gomega.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
-       })
-
-       ginkgo.Context("Write and Read", func() {
-               ginkgo.BeforeEach(func() {
-                       var err error
-                       path, err = filepath.Abs("test1")
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       options = &wal.Options{
-                               BufferSize:          1024, // 1KB
-                               BufferBatchInterval: 1 * time.Second,
-                       }
-                       log, err = wal.New(path, options)
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-               })
-
-               ginkgo.AfterEach(func() {
-                       err := log.Close()
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-               })
-
-               ginkgo.It("should write and read data correctly", func() {
-                       seriesIDCount := 100
-                       seriesIDElementCount := 20
-                       writeLogCount := seriesIDCount * seriesIDElementCount
-
-                       var wg sync.WaitGroup
-                       wg.Add(writeLogCount)
-                       baseTime := time.Now()
-                       for i := 0; i < seriesIDCount; i++ {
-                               seriesID := []byte(fmt.Sprintf("series-%d", i))
-                               go func() {
-                                       for j := 0; j < seriesIDElementCount; 
j++ {
-                                               timestamp := 
time.UnixMilli(baseTime.UnixMilli() + int64(j))
-                                               value := 
[]byte(fmt.Sprintf("value-%d", j))
-                                               callback := func(seriesID 
[]byte, t time.Time, bytes []byte, err error) {
-                                                       
gomega.Expect(err).ToNot(gomega.HaveOccurred())
-
-                                                       wg.Done()
-                                               }
-                                               log.Write(seriesID, timestamp, 
value, callback)
-                                       }
-                               }()
-                       }
-                       wg.Wait()
-
-                       err := log.Close()
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       log, err = wal.New(path, options)
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       segments, err := log.ReadAllSegments()
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       readLogCount := 0
-
-                       for _, segment := range segments {
-                               entries := segment.GetLogEntries()
-                               for _, entity := range entries {
-                                       seriesID := entity.GetSeriesID()
-                                       // Check seriesID
-                                       gomega.Expect(seriesID != 
nil).To(gomega.BeTrue())
-
-                                       timestamps := entity.GetTimestamps()
-                                       values := entity.GetValues()
-                                       var timestamp time.Time
-                                       element := values.Front()
-                                       for i := 0; i < len(timestamps); i++ {
-                                               timestamp = timestamps[i]
-
-                                               // Check timestamp
-                                               
gomega.Expect(timestamp.UnixMilli() >= baseTime.UnixMilli()).To(gomega.BeTrue())
-                                               
gomega.Expect(timestamp.UnixMilli() <= 
baseTime.UnixMilli()+int64(seriesIDElementCount)).To(gomega.BeTrue())
-
-                                               // Check binary
-                                               elementSequence := 
timestamp.UnixMilli() - baseTime.UnixMilli()
-                                               value := element.Value.([]byte)
-                                               
gomega.Expect(bytes.Equal([]byte(fmt.Sprintf("value-%d", elementSequence)), 
value)).To(gomega.BeTrue())
-
-                                               readLogCount++
-                                               element = element.Next()
-                                       }
-                               }
-                       }
-
-                       // Check write/read log count
-                       gomega.Expect(writeLogCount == 
readLogCount).To(gomega.BeTrue())
-               })
-       })
-
-       ginkgo.Context("Rotate", func() {
-               ginkgo.BeforeEach(func() {
-                       var err error
-                       path, err = filepath.Abs("test2")
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       options = &wal.Options{
-                               BufferSize: 1,
-                       }
-                       log, err = wal.New(path, options)
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-               })
-
-               ginkgo.AfterEach(func() {
-                       err := log.Close()
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-               })
-
-               ginkgo.It("should rotate correctly", func() {
-                       var wg sync.WaitGroup
-                       writeLogCount := 3
-
-                       wg.Add(writeLogCount)
-                       expectSegments := make(map[wal.SegmentID][]byte)
-                       for i := 0; i < writeLogCount; i++ {
-                               seriesID := []byte(fmt.Sprintf("series-%d", i))
-                               timestamp := time.Now()
-                               value := []byte(fmt.Sprintf("value-%d", i))
-                               callback := func(seriesID []byte, t time.Time, 
bytes []byte, err error) {
-                                       
gomega.Expect(err).ToNot(gomega.HaveOccurred())
-
-                                       // Rotate
-                                       segment, err := log.Rotate()
-                                       
gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                                       expectSegments[segment.GetSegmentID()] 
= seriesID
-
-                                       wg.Done()
-                               }
-                               log.Write(seriesID, timestamp, value, callback)
-                       }
-                       wg.Wait()
-
-                       err := log.Close()
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       log, err = wal.New(path, options)
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-
-                       // Check segment files
-                       gomega.Expect(len(expectSegments) == 
writeLogCount).To(gomega.BeTrue())
-                       for segmentID, seriesID := range expectSegments {
-                               segment, err := log.Read(segmentID)
-                               gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                               entries := segment.GetLogEntries()
-                               gomega.Expect(len(entries) == 
1).To(gomega.BeTrue())
-                               
gomega.Expect(bytes.Equal(entries[0].GetSeriesID(), 
seriesID)).To(gomega.BeTrue())
-                       }
-               })
-       })
-
-       ginkgo.Context("Delete", func() {
-               ginkgo.BeforeEach(func() {
-                       var err error
-                       path, err = filepath.Abs("test3")
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       log, err = wal.New(path, options)
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-               })
-
-               ginkgo.AfterEach(func() {
-                       err := log.Close()
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-               })
-
-               ginkgo.It("should delete correctly", func() {
-                       var err error
-
-                       segments, err := log.ReadAllSegments()
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       gomega.Expect(len(segments) == 1).To(gomega.BeTrue())
-
-                       segment, err := log.Rotate()
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       segments, err = log.ReadAllSegments()
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       gomega.Expect(len(segments) == 2).To(gomega.BeTrue())
-
-                       err = log.Delete(segment.GetSegmentID())
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-
-                       log.Close()
-                       log, err = wal.New(path, options)
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-
-                       segments, err = log.ReadAllSegments()
-                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
-
-                       // Check segment files
-                       gomega.Expect(len(segments) == 1).To(gomega.BeTrue())
-               })
-       })
-})
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index 7379151c..63e962d5 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -59,7 +59,8 @@ services:
       - sw_agent:/skywalking-java-agent
 
   oap:
-    image: "ghcr.io/apache/skywalking/oap:${SW_OAP_COMMIT}"
+    # TODO: use the main repo image once v0.6.0 is released and merged into 
the main repo
+    image: "docker.io/hanahmily/oap:${SW_OAP_COMMIT}"
     expose:
       - 11800
       - 12800
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index c7744d4c..e0dc9ddc 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -25,5 +25,5 @@ 
SW_KUBERNETES_COMMIT_SHA=e2c61c6774cf377b23516fca6f8a1e119d3191c5
 SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
 SW_CTL_COMMIT=6b2eb0011e38b630db6af7203db215806bd141ed
 
-SW_OAP_COMMIT=e5c308d4358d5b02a658b8ad7f153aec99b8e63a
+SW_OAP_COMMIT=cec72540e712e0c9a56c18171fb3e98897b27f11
 SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9
diff --git a/test/stress/istio/testdata/groups/group.yaml 
b/test/stress/istio/testdata/groups/group.yaml
index 45e403cc..1031c1a2 100644
--- a/test/stress/istio/testdata/groups/group.yaml
+++ b/test/stress/istio/testdata/groups/group.yaml
@@ -6,9 +6,6 @@
     modRevision: "0"
     name: measure-default
   resourceOpts:
-    blockInterval:
-      num: 192
-      unit: UNIT_HOUR
     segmentInterval:
       num: 8
       unit: UNIT_DAY
@@ -25,9 +22,6 @@
     modRevision: "0"
     name: measure-minute
   resourceOpts:
-    blockInterval:
-      num: 4
-      unit: UNIT_HOUR
     segmentInterval:
       num: 1
       unit: UNIT_DAY
@@ -44,9 +38,6 @@
     modRevision: "0"
     name: stream-browser_error_log
   resourceOpts:
-    blockInterval:
-      num: 4
-      unit: UNIT_HOUR
     segmentInterval:
       num: 1
       unit: UNIT_DAY
@@ -63,9 +54,6 @@
     modRevision: "0"
     name: stream-default
   resourceOpts:
-    blockInterval:
-      num: 24
-      unit: UNIT_HOUR
     segmentInterval:
       num: 1
       unit: UNIT_DAY
@@ -82,9 +70,6 @@
     modRevision: "0"
     name: stream-log
   resourceOpts:
-    blockInterval:
-      num: 4
-      unit: UNIT_HOUR
     segmentInterval:
       num: 1
       unit: UNIT_DAY
@@ -101,9 +86,6 @@
     modRevision: "0"
     name: stream-segment
   resourceOpts:
-    blockInterval:
-      num: 4
-      unit: UNIT_HOUR
     segmentInterval:
       num: 1
       unit: UNIT_DAY
@@ -120,9 +102,6 @@
     modRevision: "0"
     name: stream-zipkin_span
   resourceOpts:
-    blockInterval:
-      num: 4
-      unit: UNIT_HOUR
     segmentInterval:
       num: 1
       unit: UNIT_DAY
diff --git a/test/stress/trace/docker-compose.yaml 
b/test/stress/trace/docker-compose.yaml
index 7ea06551..05cfff66 100644
--- a/test/stress/trace/docker-compose.yaml
+++ b/test/stress/trace/docker-compose.yaml
@@ -53,7 +53,8 @@ services:
     extends:
       file: ../../docker/base-compose.yml
       service: oap
-    image: hanahmily/data-generator:latest
+    # TODO: use the main repo image once v0.6.0 is released and merged into 
the main repo
+    image: "docker.io/hanahmily/data-generator:${SW_OAP_COMMIT}"
     environment:
       SW_STORAGE: banyandb
     ports:
diff --git a/test/stress/trace/env b/test/stress/trace/env
index 94bce219..3acd4a0a 100644
--- a/test/stress/trace/env
+++ b/test/stress/trace/env
@@ -14,17 +14,4 @@
 # limitations under the License.
 
 
-SW_AGENT_JAVA_COMMIT=3f88d735ba2bfd1196aff946502447d4b14450c8
-SW_AGENT_SATELLITE_COMMIT=ea27a3f4e126a24775fe12e2aa2695bcb23d99c3
-SW_AGENT_NGINX_LUA_COMMIT=c3cee4841798a147d83b96a10914d4ac0e11d0aa
-SW_AGENT_NODEJS_COMMIT=2e7560518aff846befd4d6bc815fe5e38c704a11
-SW_AGENT_GO_COMMIT=4af380c2db6243106b0fc650b6003ce3b3eb82a0
-SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
-SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
-SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
-SW_KUBERNETES_COMMIT_SHA=e2c61c6774cf377b23516fca6f8a1e119d3191c5
-SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
-SW_CTL_COMMIT=6b2eb0011e38b630db6af7203db215806bd141ed
-
-SW_OAP_COMMIT=e5c308d4358d5b02a658b8ad7f153aec99b8e63a
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9
+SW_OAP_COMMIT=cec72540e712e0c9a56c18171fb3e98897b27f11
diff --git a/ui/src/components/Aside/index.vue 
b/ui/src/components/Aside/index.vue
index 00a05663..29a8509e 100644
--- a/ui/src/components/Aside/index.vue
+++ b/ui/src/components/Aside/index.vue
@@ -592,8 +592,6 @@ function openEditGroup() {
     data.groupForm.name = data.groupLists[data.clickIndex].metadata.name
     data.groupForm.catalog = data.groupLists[data.clickIndex].catalog
     data.groupForm.shardNum = 
data.groupLists[data.clickIndex].resourceOpts?.shardNum
-    data.groupForm.blockIntervalUnit = 
data.groupLists[data.clickIndex].resourceOpts?.blockInterval?.unit
-    data.groupForm.blockIntervalNum = 
data.groupLists[data.clickIndex].resourceOpts?.blockInterval?.num
     data.groupForm.segmentIntervalUnit = 
data.groupLists[data.clickIndex].resourceOpts?.segmentInterval?.unit
     data.groupForm.segmentIntervalNum = 
data.groupLists[data.clickIndex].resourceOpts?.segmentInterval?.num
     data.groupForm.ttlUnit = 
data.groupLists[data.clickIndex].resourceOpts?.ttl?.unit
@@ -741,10 +739,6 @@ function createGroupFunction() {
                     catalog: data.groupForm.catalog,
                     resourceOpts: {
                         shardNum: data.groupForm.shardNum,
-                        blockInterval: {
-                            unit: data.groupForm.blockIntervalUnit,
-                            num: data.groupForm.blockIntervalNum
-                        },
                         segmentInterval: {
                             unit: data.groupForm.segmentIntervalUnit,
                             num: data.groupForm.segmentIntervalNum
@@ -788,10 +782,6 @@ function editGroupFunction() {
                     catalog: data.groupForm.catalog,
                     resourceOpts: {
                         shardNum: data.groupForm.shardNum,
-                        blockInterval: {
-                            unit: data.groupForm.blockIntervalUnit,
-                            num: data.groupForm.blockIntervalNum
-                        },
                         segmentInterval: {
                             unit: data.groupForm.segmentIntervalUnit,
                             num: data.groupForm.segmentIntervalNum

Reply via email to