This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch clean-up in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit ba8bf559de68892b87fde961bd9776a51ea5de51 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Mon Apr 8 00:14:01 2024 +0000 Remove wal and fix e2e Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 3 + dist/LICENSE | 1 - dist/licenses/license-github.com-golang-snappy.txt | 27 - docs/concept/persistence-storage.md | 6 +- docs/concept/wal.md | 47 - go.mod | 1 - go.sum | 2 - pkg/wal/README.md | 147 --- pkg/wal/wal.go | 1081 -------------------- pkg/wal/wal_benchmark_test.go | 450 -------- pkg/wal/wal_test.go | 242 ----- test/docker/base-compose.yml | 3 +- test/e2e-v2/script/env | 2 +- test/stress/istio/testdata/groups/group.yaml | 21 - test/stress/trace/docker-compose.yaml | 3 +- test/stress/trace/env | 15 +- ui/src/components/Aside/index.vue | 10 - 17 files changed, 12 insertions(+), 2049 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7dcb06f3..6f2b85d8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -22,6 +22,9 @@ Release Notes. - Merge memory data and disk data. - Add HTTP services to TopNAggregation operations. - Add preload for the TopN query of index. +- Remove "TREE" index type. The "TREE" index type is merged into "INVERTED" index type. +- Remove "Location" field on IndexRule. Currently, the location of index is in a segment. +- Remove "BlockInterval" from Group. The block size is determined by the part. ### Bugs diff --git a/dist/LICENSE b/dist/LICENSE index 3d7db32b..66b9ff1d 100644 --- a/dist/LICENSE +++ b/dist/LICENSE @@ -266,7 +266,6 @@ BSD-3-Clause licenses github.com/fsnotify/fsnotify v1.7.0 BSD-3-Clause github.com/gogo/protobuf v1.3.2 BSD-3-Clause github.com/golang/protobuf v1.5.3 BSD-3-Clause - github.com/golang/snappy v0.0.4 BSD-3-Clause github.com/google/go-cmp v0.6.0 BSD-3-Clause github.com/google/uuid v1.4.0 BSD-3-Clause github.com/gorilla/websocket v1.5.1 BSD-3-Clause diff --git a/dist/licenses/license-github.com-golang-snappy.txt b/dist/licenses/license-github.com-golang-snappy.txt deleted file mode 100644 index 6050c10f..00000000 --- a/dist/licenses/license-github.com-golang-snappy.txt +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2011 The Snappy-Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/docs/concept/persistence-storage.md b/docs/concept/persistence-storage.md index 48199985..dd057ed2 100644 --- a/docs/concept/persistence-storage.md +++ b/docs/concept/persistence-storage.md @@ -1,11 +1,11 @@ # Persistence Storage -Persistence storage is used for unifying data of BanyanDB persistence, including write-ahead logging(WAL), index, and data collected from skywalking and other observability platforms or APM systems. It provides various implementations and IO modes to satisfy the need of different components. +Persistence storage is used for unifying data of BanyanDB persistence, including index, and data collected from skywalking and other observability platforms or APM systems. It provides various implementations and IO modes to satisfy the need of different components. BanyanDB provides a concise interface that shields the complexity of the implementation from the upper layer. By exposing necessary interfaces, upper components do not need to care how persistence is implemented and avoid dealing with differences between different operating systems. # Architecture BanyanDB uses third-party storage for actual storage, and the file system shields the differences between different platforms and storage systems, allowing developers to operate files as easily as the local file system without worrying about specific details. -For different data models, stored in different locations, such as for meta and wal data, BanyanDB uses a local file system for storage. +For different data models, stored in different locations, such as for meta data, BanyanDB uses a local file system for storage. ![](https://skywalking.apache.org/doc-graph/banyandb/v0.5.0/local_file_system.png) For index and data, the architecture of the file system is divided into three layers. @@ -45,7 +45,7 @@ return: The file instance, can be used for various file operations. ### Write BanyanDB provides two methods for writing files. -Append mode, which adds new data to the end of a file. This mode is typically used for WAL. And BanyanDB supports vector Append mode, which supports appending consecutive buffers to the end of the file. +Append mode, which adds new data to the end of a file. BanyanDB also supports vector Append mode, which supports appending consecutive buffers to the end of the file. Flush mode, which flushes all data to one file. It will return an error when writing a directory, the file does not exist or there is not enough space, and the incomplete file will be discarded. The flush operation is atomic, which means the file won't be created if an error happens during the flush process. The following is the pseudocode that calls the API in the go style. diff --git a/docs/concept/wal.md b/docs/concept/wal.md deleted file mode 100644 index 9b95c584..00000000 --- a/docs/concept/wal.md +++ /dev/null @@ -1,47 +0,0 @@ -# Background - -Write Ahead Logging (WAL) is a technique used in databases to ensure that data is not lost due to system crashes or other failures. The basic idea of WAL is to log changes to a database in a separate file before applying them to the database itself. This way, if there is a system failure, the database can be recovered by replaying the log of changes from the WAL file. -BanyanDB leverages the WAL to enhance the data buffer for schema resource writing. In such a system, write operations are first written to the WAL file before being applied to the interval buffer. This ensures that the log is written to disk before the actual data is written. Hence the term "write ahead". - -# Format - -![](https://skywalking.apache.org/doc-graph/banyandb/v0.4.0/wal-format.png) - -A segment refers to a block of data in the WAL file that contains a sequence of database changes. Once `rotate` is invoked, a new segment is created to continue logging subsequent changes. -A "WALEntry" is a data unit representing a series of changes to a Series. Each WALEntry is written to a segment. - -WAlEntry contains as follows: -- Length:8 bytes, which means the length of a WalEntry. -- Series ID:8 bytes, the same as request Series ID. -- Count:4 bytes, how many binary/timestamps in one WalEntry. -- Timestamp:8 bytes. -- Binary Length:2 bytes. -- Binary: value in the write request. - -# Write process - -![](https://skywalking.apache.org/doc-graph/banyandb/v0.4.0/wal.png) - -The writing process in WAL is as follows: - -1. Firstly, the changes are first written to the write buffer. Those with the same series ID will go to the identical WALEntry. -2. When the buffer is full, the WALEntry is created, then flushed to the disk. WAL can optionally use the compression algorithm snappy to compress the data on disk. Each WALEntry is appended to the tail of the WAL file on the disk. - -When entries in the buffer are flushed to the disk, the callback function returned by the write operation is invoked. You can ignore this function to improve the writing performance, but it risks losing data. -# Read WAL -A client could read a single segment by a segment id. When opening the segment file, the reader will decompress the WAL file if the writing compresses the data. - -# Rotation -WAL supports rotation operation to switch to a new segment. The operation closes the currently open segment and opens a new one, returning the closed segment details. - -# Delete -A client could delete a segment closed by the `rotate` operation. - -# configuration -BanyanDB WAL has the following configuration options: - -| Name | Default Value | Introduction | -| --- | --- | --- | -| wal_compression | true | Compress the WAL entry or not | -| wal_file_size | 64MB | The size of the WAL file| -| wal_buffer_size | 16kB | The size of WAL buffer. | diff --git a/go.mod b/go.mod index a9d0243b..3df6b15c 100644 --- a/go.mod +++ b/go.mod @@ -83,7 +83,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/golang/snappy v0.0.4 github.com/google/btree v1.1.2 // indirect github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect github.com/gorilla/websocket v1.5.1 // indirect diff --git a/go.sum b/go.sum index 9546f716..08274c9f 100644 --- a/go.sum +++ b/go.sum @@ -138,8 +138,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= diff --git a/pkg/wal/README.md b/pkg/wal/README.md deleted file mode 100644 index 82068238..00000000 --- a/pkg/wal/README.md +++ /dev/null @@ -1,147 +0,0 @@ -# WAL - -Deprecation Notice: This package is deprecated and will be removed in the future. - -## Benchmark - -Testing environment: - -```text -MacBook Pro (13-inch, M1, 2020) -Processor: Apple M1 -Memory: 16 GB -CPU: 8 cores -``` - -Command used - -```shell -go test -bench=. -benchmem -run=^$ -benchtime=10s -count 1 -memprofile=mem_profile.out -cpuprofile=cpu_profile.out -``` - -Test report - -```text -goos: darwin -goarch: arm64 -pkg: github.com/apache/skywalking-banyandb/pkg/wal - -Benchmark_SeriesID_1-8 299770 41357 ns/op 2654 B/op 3 allocs/op -Benchmark_SeriesID_20-8 245113 42125 ns/op 1916 B/op 5 allocs/op -Benchmark_SeriesID_100-8 296856 42177 ns/op 1291 B/op 7 allocs/op -Benchmark_SeriesID_500-8 275554 42360 ns/op 1543 B/op 7 allocs/op -Benchmark_SeriesID_1000-8 289639 39556 ns/op 1543 B/op 7 allocs/op -Benchmark_SeriesID_1000_Buffer_64K-8 282884 38827 ns/op 1543 B/op 7 allocs/op -Benchmark_SeriesID_1000_Buffer_128K-8 606891 20238 ns/op 1534 B/op 7 allocs/op -Benchmark_SeriesID_1000_Buffer_512K-8 1958060 5764 ns/op 1553 B/op 7 allocs/op -Benchmark_SeriesID_1000_Buffer_1MB-8 4478250 2738 ns/op 1522 B/op 6 allocs/op -Benchmark_SeriesID_1000_Buffer_2MB-8 7515986 1537 ns/op 1818 B/op 5 allocs/op -Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush-8 7249369 1676 ns/op 1542 B/op 7 allocs/op -Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush-8 7443198 1632 ns/op 1534 B/op 7 allocs/op -Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush-8 7239253 1631 ns/op 1553 B/op 7 allocs/op -Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush-8 8047040 1497 ns/op 1521 B/op 6 allocs/op -Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush-8 7938543 1508 ns/op 1818 B/op 5 allocs/op -Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB-8 7526020 1610 ns/op 1542 B/op 7 allocs/op -Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB-8 7432317 1591 ns/op 1542 B/op 7 allocs/op -Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB-8 7764439 1568 ns/op 1542 B/op 7 allocs/op -``` - -*Concluded from the benchmark test:* -- Turning off sync flush file performance is better. -- Increasing the buffer size is good for improving throughput. - -### CPU profile - -open with `go tool pprof -http="127.0.0.1:8080" cpu_profile.out` and you will get the following report - -```text -Flat Flat% Sum% Cum Cum% Name Inlined? -81.19s 29.74% 29.74% 81.19s 29.74% runtime.pthread_cond_signal -65.81s 24.11% 53.84% 65.81s 24.11% runtime.pthread_cond_wait -49.78s 18.23% 72.08% 49.78s 18.23% runtime.usleep -31.62s 11.58% 83.66% 31.62s 11.58% syscall.syscall -7.06s 2.59% 86.25% 7.06s 2.59% runtime.pthread_kill -6.74s 2.47% 88.71% 6.74s 2.47% runtime.madvise -2.43s 0.89% 89.60% 2.76s 1.01% github.com/golang/snappy.encodeBlock -2.11s 0.77% 90.38% 6.77s 2.48% runtime.scanobject -1.40s 0.51% 90.89% 2.36s 0.86% runtime.greyobject -1.32s 0.48% 91.37% 1.84s 0.67% runtime.mapaccess1 -1.03s 0.38% 91.75% 1.47s 0.54% runtime.findObject -0.98s 0.36% 92.11% 38.51s 14.11% runtime.stealWork -0.64s 0.23% 92.34% 2.43s 0.89% runtime.mallocgc -0.61s 0.22% 92.57% 2.34s 0.86% runtime.mapassign -0.33s 0.12% 92.69% 14.24s 5.22% runtime.gcDrain -0.20s 0.07% 92.76% 13.41s 4.91% runtime.lock2 -0.20s 0.07% 92.84% 4.42s 1.62% github.com/apache/skywalking-banyandb/pkg/wal.(*buffer).write (inline) -0.12s 0.04% 92.88% 112.36s 41.16% runtime.findRunnable -0.10s 0.04% 92.92% 36.86s 13.50% runtime.runqgrab -0.09s 0.03% 92.95% 1.57s 0.58% runtime.growslice -0.06s 0.02% 92.97% 36.91s 13.52% github.com/apache/skywalking-banyandb/pkg/wal.(*log).flushBuffer -0.06s 0.02% 92.99% 1.76s 0.64% github.com/apache/skywalking-banyandb/pkg/wal.(*buffer).notifyRequests -0.05s 0.02% 93.01% 106.31s 38.94% runtime.systemstack -0.05s 0.02% 93.03% 6.56s 2.40% runtime.(*mheap).allocSpan -0.05s 0.02% 93.05% 5.78s 2.12% runtime.(*gcWork).balance -0.04s 0.01% 93.06% 8.32s 3.05% runtime.gcBgMarkWorker -0.03s 0.01% 93.07% 36.89s 13.51% runtime.runqsteal -0.02s 0.01% 93.08% 67.03s 24.55% runtime.semasleep -0.02s 0.01% 93.09% 117.19s 42.93% runtime.schedule -0.02s 0.01% 93.10% 6.49s 2.38% runtime.preemptone -0.02s 0.01% 93.10% 2.80s 1.03% github.com/golang/snappy.Encode -0.02s 0.01% 93.11% 7.13s 2.61% github.com/apache/skywalking-banyandb/pkg/wal.(*log).start.func1 -0.01s 0.00% 93.11% 1.41s 0.52% time.NewTimer -0.01s 0.00% 93.12% 83.77s 30.68% runtime.wakep -0.01s 0.00% 93.12% 7.07s 2.59% runtime.signalM (inline) -0.01s 0.00% 93.12% 65.70s 24.07% runtime.notesleep -0.01s 0.00% 93.13% 3.79s 1.39% runtime.newstack -0.01s 0.00% 93.13% 78.99s 28.93% runtime.goready.func1 -0.01s 0.00% 93.14% 2.60s 0.95% runtime.forEachP -0.01s 0.00% 93.14% 5.94s 2.18% runtime.(*mheap).alloc.func1 -0.01s 0.00% 93.14% 26.23s 9.61% os.(*File).Write -0.01s 0.00% 93.15% 26.22s 9.60% internal/poll.(*FD).Write -0.01s 0.00% 93.15% 31.28s 11.46% github.com/apache/skywalking-banyandb/pkg/wal.(*log).writeWorkSegment -0.01s 0.00% 93.15% 38.70s 14.18% github.com/apache/skywalking-banyandb/pkg/wal.(*log).start.func2 -0.01s 0.00% 93.16% 2.81s 1.03% github.com/apache/skywalking-banyandb/pkg/wal.(*bufferWriter).WriteData -0 0.00% 93.16% 26.21s 9.60% syscall.write -0 0.00% 93.16% 5.04s 1.85% syscall.fcntl -0 0.00% 93.16% 26.21s 9.60% syscall.Write (inline) -0 0.00% 93.16% 6.30s 2.31% runtime.sysUsedOS (inline) -0 0.00% 93.16% 6.31s 2.31% runtime.sysUsed (inline) -0 0.00% 93.16% 66.66s 24.42% runtime.stopm -0 0.00% 93.16% 80.05s 29.32% runtime.startm -0 0.00% 93.16% 1.42s 0.52% runtime.startTheWorldWithSema -0 0.00% 93.16% 81.29s 29.78% runtime.semawakeup -0 0.00% 93.16% 4.51s 1.65% runtime.resetspinning -0 0.00% 93.16% 78.98s 28.93% runtime.ready -0 0.00% 93.16% 7.07s 2.59% runtime.preemptM -0 0.00% 93.16% 115.92s 42.46% runtime.park_m -0 0.00% 93.16% 13.03s 4.77% runtime.osyield (inline) -0 0.00% 93.16% 80.58s 29.52% runtime.notewakeup -0 0.00% 93.16% 3.58s 1.31% runtime.morestack -0 0.00% 93.16% 116.10s 42.53% runtime.mcall -0 0.00% 93.16% 1.45s 0.53% runtime.markroot -0 0.00% 93.16% 65.70s 24.07% runtime.mPark (inline) -0 0.00% 93.16% 13.41s 4.91% runtime.lockWithRank (inline) -0 0.00% 93.16% 13.41s 4.91% runtime.lock (inline) -0 0.00% 93.16% 3.35s 1.23% runtime.goschedImpl -0 0.00% 93.16% 3.32s 1.22% runtime.gopreempt_m -0 0.00% 93.16% 1.49s 0.55% runtime.gcMarkDone.func1 -0 0.00% 93.16% 14.24s 5.22% runtime.gcBgMarkWorker.func2 -0 0.00% 93.16% 5.55s 2.03% runtime.(*gcControllerState).enlistWorker -0 0.00% 93.16% 26.22s 9.60% os.(*File).write (inline) -0 0.00% 93.16% 5.04s 1.85% os.(*File).Sync -0 0.00% 93.16% 26.21s 9.60% internal/poll.ignoringEINTRIO (inline) -0 0.00% 93.16% 5.04s 1.85% internal/poll.ignoringEINTR (inline) -0 0.00% 93.16% 5.04s 1.85% internal/poll.(*FD).Fsync.func1 (inline) -0 0.00% 93.16% 5.04s 1.85% internal/poll.(*FD).Fsync -``` - -### Memory profile - -open with `go tool pprof -http="127.0.0.1:8080" mem_profile.out` and you will get the following report - -```text -Flat Flat% Sum% Cum Cum% Name -117496.32MB 87.39% 87.39% 117496.32MB 87.39% github.com/apache/skywalking-banyandb/pkg/wal.(*buffer).write -16789.80MB 12.49% 99.88% 16789.80MB 12.49% time.NewTimer -2.50MB 0.00% 99.88% 134335.12MB 99.91% github.com/apache/skywalking-banyandb/pkg/wal.(*log).start.func1 -``` \ No newline at end of file diff --git a/pkg/wal/wal.go b/pkg/wal/wal.go deleted file mode 100644 index eb9d0db0..00000000 --- a/pkg/wal/wal.go +++ /dev/null @@ -1,1081 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Package wal (Write-ahead logging) is an independent component to ensure data reliability. -package wal - -import ( - "bytes" - "container/list" - "encoding/binary" - "fmt" - "math" - "os" - "path/filepath" - "sort" - "strconv" - "strings" - "sync" - "syscall" - "time" - - "github.com/golang/snappy" - "github.com/pkg/errors" - "go.uber.org/multierr" - - "github.com/apache/skywalking-banyandb/pkg/convert" - "github.com/apache/skywalking-banyandb/pkg/encoding" - "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/run" -) - -const ( - moduleName = "wal" - segmentNamePrefix = "seg" - segmentNameSuffix = ".wal" - batchLength = 8 - entryLength = 8 - seriesIDLength = 2 - seriesCountLength = 4 - timestampVolumeLength = 8 - timestampsBinaryLength = 2 - valuesBinaryLength = 2 - parseTimeStr = "2006-01-02 15:04:05" - maxRetries = 3 - maxSegmentID = uint64(math.MaxUint64) - 1 - defaultSyncFlush = false -) - -// DefaultOptions for Open(). -var DefaultOptions = &Options{ - FileSize: 67108864, // 64MB - BufferSize: 65535, // 16KB - BufferBatchInterval: 3 * time.Second, - SyncFlush: defaultSyncFlush, -} - -// Options for creating Write-ahead Logging. -type Options struct { - FileSize int - BufferSize int - BufferBatchInterval time.Duration - FlushQueueSize int - SyncFlush bool -} - -// WAL denotes a Write-ahead logging. -// Modules who want their data reliable could write data to an instance of WAL. -// A WAL combines several segments, ingesting data on a single opened one. -// Rotating the WAL will create a new segment, marking it as opened and persisting previous segments on the disk. -type WAL interface { - // Write a logging entity. - // It will return immediately when the data is written in the buffer, - // The callback function will be called when the entity is flushed on the persistent storage. - Write(seriesID []byte, timestamp time.Time, data []byte, callback func([]byte, time.Time, []byte, error)) - // Read specified segment by SegmentID. - Read(segmentID SegmentID) (Segment, error) - // ReadAllSegments reads all segments sorted by their creation time in ascending order. - ReadAllSegments() ([]Segment, error) - // Rotate closes the open segment and opens a new one, returning the closed segment details. - Rotate() (Segment, error) - // Delete the specified segment. - Delete(segmentID SegmentID) error - // Close all of segments and stop WAL work. - Close() error -} - -// SegmentID identities a segment in a WAL. -type SegmentID uint64 - -// Segment allows reading underlying segments that hold WAl entities. -type Segment interface { - GetSegmentID() SegmentID - GetLogEntries() []LogEntry -} - -// LogEntry used for attain detail value of WAL entry. -type LogEntry interface { - GetSeriesID() []byte - GetTimestamps() []time.Time - GetValues() *list.List -} - -// log implements the WAL interface. -type log struct { - writeCloser *run.ChannelCloser - flushCloser *run.ChannelCloser - chanGroupCloser *run.ChannelGroupCloser - buffer buffer - logger *logger.Logger - bufferWriter *bufferWriter - segmentMap map[SegmentID]*segment - workSegment *segment - writeChannel chan logRequest - flushChannel chan buffer - path string - options Options - rwMutex sync.RWMutex - closerOnce sync.Once -} - -type segment struct { - file *os.File - path string - logEntries []LogEntry - segmentID SegmentID -} - -type logRequest struct { - seriesID []byte - timestamp time.Time - callback func([]byte, time.Time, []byte, error) - data []byte -} - -type logEntry struct { - timestamps []time.Time - values *list.List - seriesID []byte - entryLength uint64 - count uint32 -} - -type buffer struct { - timestampMap map[logSeriesID][]time.Time - valueMap map[logSeriesID][]byte - callbackMap map[logSeriesID][]func([]byte, time.Time, []byte, error) - count int -} - -type bufferWriter struct { - buf *bytes.Buffer - timestampsBuf *bytes.Buffer - seriesID *logSeriesID - dataBuf []byte - batchLen uint64 - seriesCount uint32 - dataLen int -} - -type logSeriesID struct { - key string - byteLen int -} - -func newLogSeriesID(b []byte) logSeriesID { - return logSeriesID{key: convert.BytesToString(b), byteLen: len(b)} -} - -func (s logSeriesID) string() string { - return s.key -} - -func (s logSeriesID) bytes() []byte { - return convert.StringToBytes(s.key) -} - -func (s logSeriesID) len() int { - return s.byteLen -} - -// New creates a WAL instance in the specified path. -func New(path string, options *Options) (WAL, error) { - // Check configuration options. - walOptions := DefaultOptions - if options != nil { - fileSize := options.FileSize - if fileSize <= 0 { - fileSize = DefaultOptions.FileSize - } - bufferSize := options.BufferSize - if bufferSize <= 0 { - bufferSize = DefaultOptions.BufferSize - } - bufferBatchInterval := options.BufferBatchInterval - if bufferBatchInterval <= 0 { - bufferBatchInterval = DefaultOptions.BufferBatchInterval - } - walOptions = &Options{ - FileSize: fileSize, - BufferSize: bufferSize, - BufferBatchInterval: bufferBatchInterval, - SyncFlush: options.SyncFlush, - } - } - - // Initial WAL path. - path, err := filepath.Abs(path) - if err != nil { - return nil, errors.Wrap(err, "Can not get absolute path: "+path) - } - if err := os.MkdirAll(path, os.ModePerm); err != nil { - return nil, err - } - - writeCloser := run.NewChannelCloser() - flushCloser := run.NewChannelCloser() - chanGroupCloser := run.NewChannelGroupCloser(writeCloser, flushCloser) - log := &log{ - path: path, - options: *walOptions, - logger: logger.GetLogger(moduleName), - writeChannel: make(chan logRequest), - flushChannel: make(chan buffer, walOptions.FlushQueueSize), - bufferWriter: newBufferWriter(), - writeCloser: writeCloser, - flushCloser: flushCloser, - chanGroupCloser: chanGroupCloser, - buffer: buffer{ - timestampMap: make(map[logSeriesID][]time.Time), - valueMap: make(map[logSeriesID][]byte), - callbackMap: make(map[logSeriesID][]func([]byte, time.Time, []byte, error)), - count: 0, - }, - } - if err := log.load(); err != nil { - return nil, err - } - log.start() - - log.logger.Info().Str("path", path).Msg("WAL has been initialized") - return log, nil -} - -// Write a logging entity. -// It will return immediately when the data is written in the buffer, -// The callback function will be called when the entity is flushed on the persistent storage. -func (log *log) Write(seriesID []byte, timestamp time.Time, data []byte, callback func([]byte, time.Time, []byte, error)) { - if !log.writeCloser.AddSender() { - return - } - defer log.writeCloser.SenderDone() - - log.writeChannel <- logRequest{ - seriesID: seriesID, - timestamp: timestamp, - data: data, - callback: callback, - } -} - -// Read specified segment by SegmentID. -func (log *log) Read(segmentID SegmentID) (Segment, error) { - log.rwMutex.RLock() - defer log.rwMutex.RUnlock() - - segment := log.segmentMap[segmentID] - return segment, nil -} - -// ReadAllSegments reads all segments sorted by their creation time in ascending order. -func (log *log) ReadAllSegments() ([]Segment, error) { - log.rwMutex.RLock() - defer log.rwMutex.RUnlock() - - segments := make([]Segment, 0) - for _, segment := range log.segmentMap { - segments = append(segments, segment) - } - sort.Slice(segments, func(i, j int) bool { return segments[i].GetSegmentID() < segments[j].GetSegmentID() }) - return segments, nil -} - -// Rotate closes the open segment and opens a new one, returning the closed segment details. -func (log *log) Rotate() (Segment, error) { - log.rwMutex.Lock() - defer log.rwMutex.Unlock() - - newSegmentID := uint64(log.workSegment.segmentID) + 1 - if newSegmentID > maxSegmentID { - return nil, errors.New("Segment ID overflow uint64," + - " please delete all WAL segment files and restart") - } - if err := log.workSegment.file.Close(); err != nil { - return nil, errors.Wrap(err, "Close WAL segment error") - } - - // Create new segment. - oldWorkSegment := log.workSegment - segment := &segment{ - segmentID: SegmentID(newSegmentID), - path: filepath.Join(log.path, segmentName(newSegmentID)), - } - if err := segment.openFile(true); err != nil { - return nil, errors.Wrap(err, "Open WAL segment error") - } - log.workSegment = segment - - // Update segment information. - log.segmentMap[log.workSegment.segmentID] = log.workSegment - return oldWorkSegment, nil -} - -// Delete the specified segment. -func (log *log) Delete(segmentID SegmentID) error { - log.rwMutex.Lock() - defer log.rwMutex.Unlock() - - // Segment which will be deleted must be closed. - if segmentID == log.workSegment.segmentID { - return errors.New("Can not delete the segment which is working") - } - defer delete(log.segmentMap, segmentID) - err := os.Remove(log.segmentMap[segmentID].path) - if err == nil { - return nil - } - var pathErr *os.PathError - if errors.As(err, &pathErr) && errors.Is(pathErr.Err, syscall.ENOENT) { - return nil - } - return errors.Wrap(err, "Delete WAL segment error") -} - -// Close all of segments and stop WAL work. -func (log *log) Close() error { - var globalErr error - log.closerOnce.Do(func() { - log.logger.Info().Msg("Closing WAL...") - - log.chanGroupCloser.CloseThenWait() - - if err := log.flushBuffer(log.buffer); err != nil { - globalErr = multierr.Append(globalErr, err) - } - if err := log.workSegment.file.Close(); err != nil { - globalErr = multierr.Append(globalErr, err) - } - log.logger.Info().Msg("Closed WAL") - }) - return globalErr -} - -func (log *log) start() { - var initialTasks sync.WaitGroup - initialTasks.Add(2) - - go func() { - if !log.writeCloser.AddReceiver() { - panic("writeCloser already closed") - } - defer log.writeCloser.ReceiverDone() - - log.logger.Info().Msg("Start batch task...") - initialTasks.Done() - - bufferVolume := 0 - for { - timer := time.NewTimer(log.options.BufferBatchInterval) - select { - case request, chOpen := <-log.writeChannel: - if !chOpen { - timer.Stop() - log.logger.Info().Msg("Stop batch task when write-channel closed") - return - } - - log.buffer.write(request) - if log.logger.Debug().Enabled() { - log.logger.Debug().Msg("Write request to buffer. elements: " + strconv.Itoa(log.buffer.count)) - } - - bufferVolume += len(request.seriesID) + timestampVolumeLength + len(request.data) - if bufferVolume > log.options.BufferSize { - log.triggerFlushing() - bufferVolume = 0 - } - case <-timer.C: - if bufferVolume == 0 { - continue - } - log.triggerFlushing() - bufferVolume = 0 - case <-log.writeCloser.CloseNotify(): - timer.Stop() - log.logger.Info().Msg("Stop batch task when close notify") - return - } - timer.Stop() - } - }() - - go func() { - if !log.flushCloser.AddReceiver() { - panic("flushCloser already closed") - } - defer log.flushCloser.ReceiverDone() - - log.logger.Info().Msg("Start flush task...") - initialTasks.Done() - - for { - select { - case batch, chOpen := <-log.flushChannel: - if !chOpen { - log.logger.Info().Msg("Stop flush task when flush-channel closed") - return - } - - startTime := time.Now() - var err error - for i := 0; i < maxRetries; i++ { - if err = log.flushBuffer(batch); err != nil { - log.logger.Err(err).Msg("Flushing buffer failed. Retrying...") - time.Sleep(time.Second) - continue - } - break - } - if log.logger.Debug().Enabled() { - log.logger.Debug().Msg("Flushed buffer to WAL file. elements: " + - strconv.Itoa(batch.count) + ", cost: " + time.Since(startTime).String()) - } - - batch.notifyRequests(err) - case <-log.flushCloser.CloseNotify(): - log.logger.Info().Msg("Stop flush task when close notify") - return - } - } - }() - - initialTasks.Wait() - log.logger.Info().Msg("Started WAL") -} - -func (log *log) triggerFlushing() { - log.flushChannel <- log.buffer - if log.logger.Debug().Enabled() { - log.logger.Debug().Msg("Send buffer to flush-channel. elements: " + strconv.Itoa(log.buffer.count)) - } - - log.newBuffer() -} - -func (log *log) newBuffer() { - log.buffer = buffer{ - timestampMap: make(map[logSeriesID][]time.Time), - valueMap: make(map[logSeriesID][]byte), - callbackMap: make(map[logSeriesID][]func([]byte, time.Time, []byte, error)), - count: 0, - } -} - -func (log *log) flushBuffer(buffer buffer) error { - if buffer.count == 0 { - return nil - } - - var err error - if err = log.bufferWriter.Reset(); err != nil { - return errors.Wrap(err, "Reset buffer writer error") - } - - for seriesID, timestamps := range buffer.timestampMap { - log.bufferWriter.ResetSeries() - - if err = log.bufferWriter.WriteSeriesID(seriesID); err != nil { - return errors.Wrap(err, "Write seriesID error") - } - log.bufferWriter.WriteTimestamps(timestamps) - log.bufferWriter.WriteData(buffer.valueMap[seriesID]) - if err = log.bufferWriter.AddSeries(); err != nil { - return errors.Wrap(err, "Add series error") - } - } - - return log.writeWorkSegment(log.bufferWriter.Bytes()) -} - -func (log *log) writeWorkSegment(data []byte) error { - log.rwMutex.RLock() - defer log.rwMutex.RUnlock() - - // Write batch data to WAL segment file - if _, err := log.workSegment.file.Write(data); err != nil { - return errors.Wrap(err, "Write WAL segment file error, file: "+log.workSegment.path) - } - if log.options.SyncFlush { - if err := log.workSegment.file.Sync(); err != nil { - log.logger.Warn().Msg("Sync WAL segment file to disk failed, file: " + log.workSegment.path) - } - } - return nil -} - -func (log *log) load() error { - files, err := os.ReadDir(log.path) - if err != nil { - return errors.Wrap(err, "Can not read dir: "+log.path) - } - // Load all of WAL segments. - var workSegmentID SegmentID - log.segmentMap = make(map[SegmentID]*segment) - for _, file := range files { - name := file.Name() - segmentID, parsePathErr := parseSegmentID(name) - if parsePathErr != nil { - return errors.Wrap(parsePathErr, "Parse file name error, name: "+name) - } - if segmentID > uint64(workSegmentID) { - workSegmentID = SegmentID(segmentID) - } - segment := &segment{ - segmentID: SegmentID(segmentID), - path: filepath.Join(log.path, segmentName(segmentID)), - } - if err = segment.parseLogEntries(); err != nil { - return errors.Wrap(err, "Fail to parse log entries") - } - log.segmentMap[SegmentID(segmentID)] = segment - - if log.logger.Debug().Enabled() { - log.logger.Debug().Msg("Loaded segment file: " + segment.path) - } - } - - // If load first time. - if len(log.segmentMap) == 0 { - segmentID := SegmentID(1) - segment := &segment{ - segmentID: segmentID, - path: filepath.Join(log.path, segmentName(uint64(segmentID))), - } - log.segmentMap[segmentID] = segment - log.workSegment = segment - } else { - log.workSegment = log.segmentMap[workSegmentID] - } - if err = log.workSegment.openFile(false); err != nil { - return errors.Wrap(err, "Open WAL segment error, file: "+log.workSegment.path) - } - return nil -} - -func newBufferWriter() *bufferWriter { - return &bufferWriter{ - buf: bytes.NewBuffer([]byte{}), - timestampsBuf: bytes.NewBuffer([]byte{}), - dataBuf: make([]byte, 128), - } -} - -func (w *bufferWriter) Reset() error { - w.ResetSeries() - w.buf.Reset() - w.batchLen = 0 - - // pre-placement padding - err := w.writeBatchLength(0) - return err -} - -func (w *bufferWriter) ResetSeries() { - w.timestampsBuf.Reset() - w.dataLen = 0 - w.seriesID = nil - w.seriesCount = 0 -} - -func (w *bufferWriter) AddSeries() error { - seriesIDBytesLen := uint16(w.seriesID.len()) - timestampsBytesLen := uint16(w.timestampsBuf.Len()) - entryLen := seriesIDLength + uint64(seriesIDBytesLen) + seriesCountLength + timestampsBinaryLength + uint64(timestampsBytesLen) + uint64(w.dataLen) - - var err error - if err = w.writeEntryLength(entryLen); err != nil { - return err - } - if err = w.writeSeriesIDLength(seriesIDBytesLen); err != nil { - return err - } - if err = w.writeSeriesID(w.seriesID); err != nil { - return err - } - if err = w.writeSeriesCount(w.seriesCount); err != nil { - return err - } - if err = w.writeTimestampsLength(timestampsBytesLen); err != nil { - return err - } - if err = w.writeTimestamps(w.timestampsBuf.Bytes()); err != nil { - return err - } - if err = w.writeData(w.dataBuf[:w.dataLen]); err != nil { - return err - } - w.batchLen += entryLen - return nil -} - -func (w *bufferWriter) Bytes() []byte { - batchBytes := w.buf.Bytes() - batchLen := uint64(len(batchBytes)) - batchLength - return w.rewriteBatchLength(batchBytes, batchLen) -} - -func (w *bufferWriter) WriteSeriesID(seriesID logSeriesID) error { - w.seriesID = &seriesID - return nil -} - -func (w *bufferWriter) WriteTimestamps(timestamps []time.Time) { - timestampWriter := encoding.NewWriter() - timestampEncoder := encoding.NewXOREncoder(timestampWriter) - timestampWriter.Reset(w.timestampsBuf) - for _, timestamp := range timestamps { - timestampEncoder.Write(timeToUnixNano(timestamp)) - } - timestampWriter.Flush() - w.seriesCount = uint32(len(timestamps)) -} - -func (w *bufferWriter) WriteData(data []byte) { - maxEncodedLen := snappy.MaxEncodedLen(len(data)) - dataBufLen := len(w.dataBuf) - if dataBufLen < maxEncodedLen { - newCapacity := (dataBufLen * 2) - (dataBufLen / 2) - if newCapacity < maxEncodedLen { - newCapacity = maxEncodedLen - } - w.dataBuf = make([]byte, newCapacity) - } - snappyData := snappy.Encode(w.dataBuf, data) - w.dataLen = len(snappyData) -} - -func (w *bufferWriter) writeBatchLength(data uint64) error { - return writeUint64(w.buf, data) -} - -func (w *bufferWriter) rewriteBatchLength(b []byte, batchLen uint64) []byte { - _ = b[7] // early bounds check to guarantee safety of writes below - b[0] = byte(batchLen) - b[1] = byte(batchLen >> 8) - b[2] = byte(batchLen >> 16) - b[3] = byte(batchLen >> 24) - b[4] = byte(batchLen >> 32) - b[5] = byte(batchLen >> 40) - b[6] = byte(batchLen >> 48) - b[7] = byte(batchLen >> 56) - return b -} - -func (w *bufferWriter) writeEntryLength(data uint64) error { - return writeUint64(w.buf, data) -} - -func (w *bufferWriter) writeSeriesIDLength(data uint16) error { - return writeUint16(w.buf, data) -} - -func (w *bufferWriter) writeSeriesID(data *logSeriesID) error { - _, err := w.buf.WriteString(data.string()) - return err -} - -func (w *bufferWriter) writeSeriesCount(data uint32) error { - return writeUint32(w.buf, data) -} - -func (w *bufferWriter) writeTimestampsLength(data uint16) error { - return writeUint16(w.buf, data) -} - -func (w *bufferWriter) writeTimestamps(data []byte) error { - _, err := w.buf.Write(data) - return err -} - -func (w *bufferWriter) writeData(data []byte) error { - _, err := w.buf.Write(data) - return err -} - -func (segment *segment) GetSegmentID() SegmentID { - return segment.segmentID -} - -func (segment *segment) GetLogEntries() []LogEntry { - return segment.logEntries -} - -func (segment *segment) openFile(overwrite bool) error { - var err error - if overwrite { - segment.file, err = os.OpenFile(segment.path, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm) - } else { - segment.file, err = os.OpenFile(segment.path, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModePerm) - } - return err -} - -func (segment *segment) parseLogEntries() error { - segmentBytes, err := os.ReadFile(segment.path) - if err != nil { - return errors.Wrap(err, "Read WAL segment failed, path: "+segment.path) - } - - var logEntries []LogEntry - var data []byte - var batchLen uint64 - var entryLen uint64 - var seriesIDLen uint16 - var seriesID []byte - var seriesCount uint32 - var timestampsBinaryLen uint16 - var entryEndPosition uint64 - - var oldPos uint64 - var pos uint64 - parseNextBatchFlag := true - segmentBytesLen := uint64(len(segmentBytes)) - - for { - if parseNextBatchFlag { - if segmentBytesLen <= batchLength { - break - } - data = segmentBytes[pos : pos+batchLength] - batchLen, err = segment.parseBatchLength(data) - if err != nil { - return errors.Wrap(err, "Parse batch length error") - } - - if segmentBytesLen <= batchLen { - break - } - - pos += batchLength - oldPos = pos - parseNextBatchFlag = false - } - - // Parse entryLength. - data = segmentBytes[pos : pos+entryLength] - - entryLen, err = segment.parseEntryLength(data) - if err != nil { - return errors.Wrap(err, "Parse entry length error") - } - pos += entryLength - - // Mark entry end-position - entryEndPosition = pos + entryLen - if segmentBytesLen < entryEndPosition { - break - } - - // Parse seriesIDLength. - data = segmentBytes[pos : pos+seriesIDLength] - seriesIDLen, err = segment.parseSeriesIDLength(data) - if err != nil { - return errors.Wrap(err, "Parse seriesID length error") - } - pos += seriesIDLength - - // Parse seriesID. - data = segmentBytes[pos : pos+uint64(seriesIDLen)] - seriesID = segment.parseSeriesID(data) - pos += uint64(seriesIDLen) - - // Parse series count. - data = segmentBytes[pos : pos+seriesCountLength] - seriesCount, err = segment.parseSeriesCountLength(data) - if err != nil { - return errors.Wrap(err, "Parse series count length error") - } - pos += seriesCountLength - - // Parse timestamps compression binary. - data = segmentBytes[pos : pos+timestampsBinaryLength] - timestampsBinaryLen, err = segment.parseTimestampsLength(data) - if err != nil { - return errors.Wrap(err, "Parse timestamps length error") - } - pos += timestampsBinaryLength - data = segmentBytes[pos : pos+uint64(timestampsBinaryLen)] - var timestamps []time.Time - timestamps, err = segment.parseTimestamps(seriesCount, data) - if err != nil { - return errors.Wrap(err, "Parse timestamps compression binary error") - } - pos += uint64(timestampsBinaryLen) - - // Parse values compression binary. - data = segmentBytes[pos:entryEndPosition] - values, err := segment.parseValuesBinary(data) - if err != nil { - return errors.Wrap(err, "Parse values compression binary error") - } - if values.Len() != int(seriesCount) { - return errors.New("values binary items not match series count. series count: " + - strconv.Itoa(int(seriesCount)) + ", values binary items: " + strconv.Itoa(values.Len())) - } - pos = entryEndPosition - - logEntry := &logEntry{ - entryLength: entryLen, - seriesID: seriesID, - count: seriesCount, - timestamps: timestamps, - values: values, - } - logEntries = append(logEntries, logEntry) - - if pos == segmentBytesLen { - break - } - if pos-oldPos == batchLen { - parseNextBatchFlag = true - } - } - segment.logEntries = logEntries - return nil -} - -func (segment *segment) parseBatchLength(data []byte) (uint64, error) { - var batchLen uint64 - buf := bytes.NewBuffer(data) - if err := binary.Read(buf, binary.LittleEndian, &batchLen); err != nil { - return 0, err - } - return batchLen, nil -} - -func (segment *segment) parseEntryLength(data []byte) (uint64, error) { - var entryLen uint64 - buf := bytes.NewBuffer(data) - if err := binary.Read(buf, binary.LittleEndian, &entryLen); err != nil { - return 0, err - } - return entryLen, nil -} - -func (segment *segment) parseSeriesIDLength(data []byte) (uint16, error) { - var seriesIDLen uint16 - buf := bytes.NewBuffer(data) - if err := binary.Read(buf, binary.LittleEndian, &seriesIDLen); err != nil { - return 0, err - } - return seriesIDLen, nil -} - -func (segment *segment) parseSeriesID(data []byte) []byte { - return newLogSeriesID(data).bytes() -} - -func (segment *segment) parseSeriesCountLength(data []byte) (uint32, error) { - var seriesCount uint32 - buf := bytes.NewBuffer(data) - if err := binary.Read(buf, binary.LittleEndian, &seriesCount); err != nil { - return 0, err - } - return seriesCount, nil -} - -func (segment *segment) parseTimestampsLength(data []byte) (uint16, error) { - var timestampsLen uint16 - buf := bytes.NewBuffer(data) - if err := binary.Read(buf, binary.LittleEndian, ×tampsLen); err != nil { - return 0, err - } - return timestampsLen, nil -} - -func (segment *segment) parseTimestamps(seriesCount uint32, data []byte) ([]time.Time, error) { - timestampReader := encoding.NewReader(bytes.NewReader(data)) - timestampDecoder := encoding.NewXORDecoder(timestampReader) - var timestamps []time.Time - for i := 0; i < int(seriesCount); i++ { - if !timestampDecoder.Next() { - return nil, errors.New("Timestamps length not match series count") - } - timestamps = append(timestamps, unixNanoToTime(timestampDecoder.Value())) - } - return timestamps, nil -} - -func (segment *segment) parseValuesBinary(data []byte) (*list.List, error) { - var err error - if data, err = snappy.Decode(nil, data); err != nil { - return nil, errors.Wrap(err, "Decode values compression binary error") - } - - values := list.New() - position := 0 - for { - nextPosition, value := readValuesBinary(data, position, valuesBinaryLength) - if value == nil { - break - } - values.PushBack(value) - position = nextPosition - } - return values, nil -} - -func (logEntry *logEntry) GetSeriesID() []byte { - return logEntry.seriesID -} - -func (logEntry *logEntry) GetTimestamps() []time.Time { - return logEntry.timestamps -} - -func (logEntry *logEntry) GetValues() *list.List { - return logEntry.values -} - -func (buffer *buffer) write(request logRequest) { - key := newLogSeriesID(request.seriesID) - buffer.timestampMap[key] = append(buffer.timestampMap[key], request.timestamp) - - // Value item: binary-length(2-bytes) + binary data(n-bytes) - binaryLen := uint16(len(request.data)) - buffer.valueMap[key] = append(buffer.valueMap[key], byte(binaryLen), byte(binaryLen>>8)) - buffer.valueMap[key] = append(buffer.valueMap[key], request.data...) - - buffer.callbackMap[key] = append(buffer.callbackMap[key], request.callback) - buffer.count++ -} - -func (buffer *buffer) notifyRequests(err error) { - var timestamps []time.Time - var values []byte - var valueItem []byte - var valuePos int - for seriesID, callbacks := range buffer.callbackMap { - timestamps = buffer.timestampMap[seriesID] - values = buffer.valueMap[seriesID] - valuePos = 0 - for index, callback := range callbacks { - valuePos, valueItem = readValuesBinary(values, valuePos, valuesBinaryLength) - if callback != nil { - buffer.runningCallback(func() { - callback(seriesID.bytes(), timestamps[index], valueItem, err) - }) - } - } - } -} - -func (buffer *buffer) runningCallback(callback func()) { - defer func() { - _ = recover() - }() - callback() -} - -func segmentName(segmentID uint64) string { - return fmt.Sprintf("%v%016x%v", segmentNamePrefix, segmentID, segmentNameSuffix) -} - -// Parse segment ID. segmentName example: seg0000000000000001.wal. -func parseSegmentID(segmentName string) (uint64, error) { - _ = segmentName[22:] // early bounds check to guarantee safety of reads below - if !strings.HasPrefix(segmentName, segmentNamePrefix) { - return 0, errors.New("Invalid segment name: " + segmentName) - } - if !strings.HasSuffix(segmentName, segmentNameSuffix) { - return 0, errors.New("Invalid segment name: " + segmentName) - } - return strconv.ParseUint(segmentName[3:19], 16, 64) -} - -func readValuesBinary(raw []byte, position int, offsetLen int) (int, []byte) { - if position == len(raw) { - return position, nil - } - - data := raw[position : position+offsetLen] - binaryLen := bytesToUint16(data) - position += offsetLen - - data = raw[position : position+int(binaryLen)] - position += int(binaryLen) - return position, data -} - -func writeUint16(buffer *bytes.Buffer, data uint16) error { - var err error - if err = buffer.WriteByte(byte(data)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 8)); err != nil { - return err - } - return err -} - -func writeUint32(buffer *bytes.Buffer, data uint32) error { - var err error - if err = buffer.WriteByte(byte(data)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 8)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 16)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 24)); err != nil { - return err - } - return err -} - -func writeUint64(buffer *bytes.Buffer, data uint64) error { - var err error - if err = buffer.WriteByte(byte(data)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 8)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 16)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 24)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 32)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 40)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 48)); err != nil { - return err - } - if err = buffer.WriteByte(byte(data >> 56)); err != nil { - return err - } - return err -} - -func bytesToUint16(buf []byte) uint16 { - return binary.LittleEndian.Uint16(buf) -} - -func timeToUnixNano(time time.Time) uint64 { - return uint64(time.UnixNano()) -} - -func unixNanoToTime(unixNano uint64) time.Time { - return time.Unix(0, int64(unixNano)) -} diff --git a/pkg/wal/wal_benchmark_test.go b/pkg/wal/wal_benchmark_test.go deleted file mode 100644 index ba9570f2..00000000 --- a/pkg/wal/wal_benchmark_test.go +++ /dev/null @@ -1,450 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Package wal (Write-ahead logging) is an independent component to ensure data reliability. -package wal - -import ( - "crypto/rand" - "fmt" - "math/big" - "os" - "path/filepath" - "testing" - "time" - - "github.com/apache/skywalking-banyandb/pkg/logger" -) - -var ( - path = "benchmark" - baseTime = time.Now().UnixMilli() - data = newBinaryData() - dataLen = len(data) - seriesID1 = newSeriesIDList(1) - seriesID20 = newSeriesIDList(20) - seriesID100 = newSeriesIDList(100) - seriesID500 = newSeriesIDList(500) - seriesID1000 = newSeriesIDList(1000) - callback = func(seriesID []byte, t time.Time, bytes []byte, err error) {} -) - -func Benchmark_SeriesID_1(b *testing.B) { - wal := newWAL(&Options{SyncFlush: true}) - defer closeWAL(wal) - - seriesID := seriesID1 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_20(b *testing.B) { - wal := newWAL(&Options{SyncFlush: true}) - defer closeWAL(wal) - - seriesID := seriesID20 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_100(b *testing.B) { - wal := newWAL(&Options{SyncFlush: true}) - defer closeWAL(wal) - - seriesID := seriesID100 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_500(b *testing.B) { - wal := newWAL(&Options{SyncFlush: true}) - defer closeWAL(wal) - - seriesID := seriesID500 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000(b *testing.B) { - wal := newWAL(&Options{SyncFlush: true}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_64K(b *testing.B) { - wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 64}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_128K(b *testing.B) { - wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 128}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_512K(b *testing.B) { - wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 512}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_1MB(b *testing.B) { - wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 1024}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_2MB(b *testing.B) { - wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 1024 * 2}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 128, SyncFlush: false}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 512, SyncFlush: false}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 1024, SyncFlush: false}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2, SyncFlush: false}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - rotateSize := 1024 * 1024 * 16 // 16MB - rotateChan := make(chan struct{}) - rotateMessage := struct{}{} - seriesIDVolume := 16 - timeVolume := 8 - var logVolume int - var binaryData []byte - - b.ResetTimer() - go func() { - for range rotateChan { - segment, err := wal.Rotate() - if err != nil { - panic(err) - } - wal.Delete(segment.GetSegmentID()) - } - }() - for i := 0; i < b.N; i++ { - binaryData = data[i%dataLen].binary - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), binaryData, callback) - - logVolume += seriesIDVolume + timeVolume + len(binaryData) - if logVolume >= rotateSize { - rotateChan <- rotateMessage - logVolume = 0 - } - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - rotateSize := 1024 * 1024 * 32 // 32MB - rotateChan := make(chan struct{}) - rotateMessage := struct{}{} - seriesIDVolume := 16 - timeVolume := 8 - var logVolume int - var binaryData []byte - - b.ResetTimer() - go func() { - for range rotateChan { - segment, err := wal.Rotate() - if err != nil { - panic(err) - } - wal.Delete(segment.GetSegmentID()) - } - }() - for i := 0; i < b.N; i++ { - binaryData = data[i%dataLen].binary - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), binaryData, callback) - - logVolume += seriesIDVolume + timeVolume + len(binaryData) - if logVolume >= rotateSize { - rotateChan <- rotateMessage - logVolume = 0 - } - } - b.StopTimer() -} - -func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB(b *testing.B) { - wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false}) - defer closeWAL(wal) - - seriesID := seriesID1000 - seriesIDLen := len(seriesID) - - rotateSize := 1024 * 1024 * 64 // 64MB - rotateChan := make(chan struct{}) - rotateMessage := struct{}{} - seriesIDVolume := 16 - timeVolume := 8 - var logVolume int - var binaryData []byte - - b.ResetTimer() - go func() { - for range rotateChan { - segment, err := wal.Rotate() - if err != nil { - panic(err) - } - wal.Delete(segment.GetSegmentID()) - } - }() - for i := 0; i < b.N; i++ { - binaryData = data[i%dataLen].binary - wal.Write(seriesID[i%seriesIDLen].key, time.UnixMilli(baseTime+1), binaryData, callback) - - logVolume += seriesIDVolume + timeVolume + len(binaryData) - if logVolume >= rotateSize { - rotateChan <- rotateMessage - logVolume = 0 - } - } - b.StopTimer() -} - -func newWAL(options *Options) WAL { - os.RemoveAll(path) - - logger.Init(logger.Logging{Level: "error"}) - logPath, _ := filepath.Abs(path) - if options == nil { - options = &Options{ - BufferSize: 1024 * 64, // 64KB - BufferBatchInterval: 3 * time.Second, - } - } - wal, err := New(logPath, options) - if err != nil { - panic(err) - } - return wal -} - -func closeWAL(wal WAL) { - err := wal.Close() - if err != nil { - panic(err) - } - - err = os.RemoveAll(path) - if err != nil { - panic(err) - } -} - -type SeriesID struct { - key []byte -} - -func newSeriesIDList(series int) []SeriesID { - var seriesIDSet []SeriesID - for i := 0; i < series; i++ { - seriesID := SeriesID{key: []byte(fmt.Sprintf("series-%d", i))} - seriesIDSet = append(seriesIDSet, seriesID) - } - return seriesIDSet -} - -type Data struct { - binary []byte -} - -func newBinaryData() []Data { - var data []Data - for i := 0; i < 2000; i++ { - data = append(data, Data{binary: []byte(randStr())}) - } - return data -} - -func randStr() string { - bytes := []byte("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"',:{}[]") - result := []byte{} - - var err error - var strLengthLower int64 = 200 - var strLengthUpper int64 = 1024 - var strLength *big.Int - var strRandIndex *big.Int - strLength, err = rand.Int(rand.Reader, big.NewInt(strLengthUpper)) - if err != nil { - panic(err) - } - if strLength.Int64() < strLengthLower { - strLength = big.NewInt(strLengthLower) - } - for i := 0; i < int(strLength.Int64()); i++ { - strRandIndex, err = rand.Int(rand.Reader, big.NewInt(int64(len(bytes)))) - if err != nil { - panic(err) - } - - result = append(result, bytes[strRandIndex.Int64()]) - } - return string(result) -} diff --git a/pkg/wal/wal_test.go b/pkg/wal/wal_test.go deleted file mode 100644 index d807f970..00000000 --- a/pkg/wal/wal_test.go +++ /dev/null @@ -1,242 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Package version can be used to implement embedding versioning details from -// git branches and tags into the binary importing this package. -package wal_test - -import ( - "bytes" - "fmt" - "os" - "path/filepath" - "sync" - "time" - - "github.com/onsi/ginkgo/v2" - "github.com/onsi/gomega" - "github.com/onsi/gomega/gleak" - - "github.com/apache/skywalking-banyandb/pkg/test/flags" - "github.com/apache/skywalking-banyandb/pkg/wal" -) - -var _ = ginkgo.Describe("WAL", func() { - var ( - path string - log wal.WAL - options *wal.Options - goods []gleak.Goroutine - ) - ginkgo.BeforeEach(func() { - options = &wal.Options{ - BufferSize: 1024, // 1KB - BufferBatchInterval: 1 * time.Second, - } - goods = gleak.Goroutines() - }) - ginkgo.AfterEach(func() { - err := os.RemoveAll(path) - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) - }) - - ginkgo.Context("Write and Read", func() { - ginkgo.BeforeEach(func() { - var err error - path, err = filepath.Abs("test1") - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - options = &wal.Options{ - BufferSize: 1024, // 1KB - BufferBatchInterval: 1 * time.Second, - } - log, err = wal.New(path, options) - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - }) - - ginkgo.AfterEach(func() { - err := log.Close() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - }) - - ginkgo.It("should write and read data correctly", func() { - seriesIDCount := 100 - seriesIDElementCount := 20 - writeLogCount := seriesIDCount * seriesIDElementCount - - var wg sync.WaitGroup - wg.Add(writeLogCount) - baseTime := time.Now() - for i := 0; i < seriesIDCount; i++ { - seriesID := []byte(fmt.Sprintf("series-%d", i)) - go func() { - for j := 0; j < seriesIDElementCount; j++ { - timestamp := time.UnixMilli(baseTime.UnixMilli() + int64(j)) - value := []byte(fmt.Sprintf("value-%d", j)) - callback := func(seriesID []byte, t time.Time, bytes []byte, err error) { - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - - wg.Done() - } - log.Write(seriesID, timestamp, value, callback) - } - }() - } - wg.Wait() - - err := log.Close() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - log, err = wal.New(path, options) - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - segments, err := log.ReadAllSegments() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - readLogCount := 0 - - for _, segment := range segments { - entries := segment.GetLogEntries() - for _, entity := range entries { - seriesID := entity.GetSeriesID() - // Check seriesID - gomega.Expect(seriesID != nil).To(gomega.BeTrue()) - - timestamps := entity.GetTimestamps() - values := entity.GetValues() - var timestamp time.Time - element := values.Front() - for i := 0; i < len(timestamps); i++ { - timestamp = timestamps[i] - - // Check timestamp - gomega.Expect(timestamp.UnixMilli() >= baseTime.UnixMilli()).To(gomega.BeTrue()) - gomega.Expect(timestamp.UnixMilli() <= baseTime.UnixMilli()+int64(seriesIDElementCount)).To(gomega.BeTrue()) - - // Check binary - elementSequence := timestamp.UnixMilli() - baseTime.UnixMilli() - value := element.Value.([]byte) - gomega.Expect(bytes.Equal([]byte(fmt.Sprintf("value-%d", elementSequence)), value)).To(gomega.BeTrue()) - - readLogCount++ - element = element.Next() - } - } - } - - // Check write/read log count - gomega.Expect(writeLogCount == readLogCount).To(gomega.BeTrue()) - }) - }) - - ginkgo.Context("Rotate", func() { - ginkgo.BeforeEach(func() { - var err error - path, err = filepath.Abs("test2") - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - options = &wal.Options{ - BufferSize: 1, - } - log, err = wal.New(path, options) - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - }) - - ginkgo.AfterEach(func() { - err := log.Close() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - }) - - ginkgo.It("should rotate correctly", func() { - var wg sync.WaitGroup - writeLogCount := 3 - - wg.Add(writeLogCount) - expectSegments := make(map[wal.SegmentID][]byte) - for i := 0; i < writeLogCount; i++ { - seriesID := []byte(fmt.Sprintf("series-%d", i)) - timestamp := time.Now() - value := []byte(fmt.Sprintf("value-%d", i)) - callback := func(seriesID []byte, t time.Time, bytes []byte, err error) { - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - - // Rotate - segment, err := log.Rotate() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - expectSegments[segment.GetSegmentID()] = seriesID - - wg.Done() - } - log.Write(seriesID, timestamp, value, callback) - } - wg.Wait() - - err := log.Close() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - log, err = wal.New(path, options) - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - - // Check segment files - gomega.Expect(len(expectSegments) == writeLogCount).To(gomega.BeTrue()) - for segmentID, seriesID := range expectSegments { - segment, err := log.Read(segmentID) - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - entries := segment.GetLogEntries() - gomega.Expect(len(entries) == 1).To(gomega.BeTrue()) - gomega.Expect(bytes.Equal(entries[0].GetSeriesID(), seriesID)).To(gomega.BeTrue()) - } - }) - }) - - ginkgo.Context("Delete", func() { - ginkgo.BeforeEach(func() { - var err error - path, err = filepath.Abs("test3") - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - log, err = wal.New(path, options) - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - }) - - ginkgo.AfterEach(func() { - err := log.Close() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - }) - - ginkgo.It("should delete correctly", func() { - var err error - - segments, err := log.ReadAllSegments() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - gomega.Expect(len(segments) == 1).To(gomega.BeTrue()) - - segment, err := log.Rotate() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - segments, err = log.ReadAllSegments() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - gomega.Expect(len(segments) == 2).To(gomega.BeTrue()) - - err = log.Delete(segment.GetSegmentID()) - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - - log.Close() - log, err = wal.New(path, options) - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - - segments, err = log.ReadAllSegments() - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - - // Check segment files - gomega.Expect(len(segments) == 1).To(gomega.BeTrue()) - }) - }) -}) diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml index 7379151c..63e962d5 100644 --- a/test/docker/base-compose.yml +++ b/test/docker/base-compose.yml @@ -59,7 +59,8 @@ services: - sw_agent:/skywalking-java-agent oap: - image: "ghcr.io/apache/skywalking/oap:${SW_OAP_COMMIT}" + # TODO: use the main repo image once v0.6.0 is released and merged into the main repo + image: "docker.io/hanahmily/oap:${SW_OAP_COMMIT}" expose: - 11800 - 12800 diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env index c7744d4c..e0dc9ddc 100644 --- a/test/e2e-v2/script/env +++ b/test/e2e-v2/script/env @@ -25,5 +25,5 @@ SW_KUBERNETES_COMMIT_SHA=e2c61c6774cf377b23516fca6f8a1e119d3191c5 SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5 SW_CTL_COMMIT=6b2eb0011e38b630db6af7203db215806bd141ed -SW_OAP_COMMIT=e5c308d4358d5b02a658b8ad7f153aec99b8e63a +SW_OAP_COMMIT=cec72540e712e0c9a56c18171fb3e98897b27f11 SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9 diff --git a/test/stress/istio/testdata/groups/group.yaml b/test/stress/istio/testdata/groups/group.yaml index 45e403cc..1031c1a2 100644 --- a/test/stress/istio/testdata/groups/group.yaml +++ b/test/stress/istio/testdata/groups/group.yaml @@ -6,9 +6,6 @@ modRevision: "0" name: measure-default resourceOpts: - blockInterval: - num: 192 - unit: UNIT_HOUR segmentInterval: num: 8 unit: UNIT_DAY @@ -25,9 +22,6 @@ modRevision: "0" name: measure-minute resourceOpts: - blockInterval: - num: 4 - unit: UNIT_HOUR segmentInterval: num: 1 unit: UNIT_DAY @@ -44,9 +38,6 @@ modRevision: "0" name: stream-browser_error_log resourceOpts: - blockInterval: - num: 4 - unit: UNIT_HOUR segmentInterval: num: 1 unit: UNIT_DAY @@ -63,9 +54,6 @@ modRevision: "0" name: stream-default resourceOpts: - blockInterval: - num: 24 - unit: UNIT_HOUR segmentInterval: num: 1 unit: UNIT_DAY @@ -82,9 +70,6 @@ modRevision: "0" name: stream-log resourceOpts: - blockInterval: - num: 4 - unit: UNIT_HOUR segmentInterval: num: 1 unit: UNIT_DAY @@ -101,9 +86,6 @@ modRevision: "0" name: stream-segment resourceOpts: - blockInterval: - num: 4 - unit: UNIT_HOUR segmentInterval: num: 1 unit: UNIT_DAY @@ -120,9 +102,6 @@ modRevision: "0" name: stream-zipkin_span resourceOpts: - blockInterval: - num: 4 - unit: UNIT_HOUR segmentInterval: num: 1 unit: UNIT_DAY diff --git a/test/stress/trace/docker-compose.yaml b/test/stress/trace/docker-compose.yaml index 7ea06551..05cfff66 100644 --- a/test/stress/trace/docker-compose.yaml +++ b/test/stress/trace/docker-compose.yaml @@ -53,7 +53,8 @@ services: extends: file: ../../docker/base-compose.yml service: oap - image: hanahmily/data-generator:latest + # TODO: use the main repo image once v0.6.0 is released and merged into the main repo + image: "docker.io/hanahmily/data-generator:${SW_OAP_COMMIT}" environment: SW_STORAGE: banyandb ports: diff --git a/test/stress/trace/env b/test/stress/trace/env index 94bce219..3acd4a0a 100644 --- a/test/stress/trace/env +++ b/test/stress/trace/env @@ -14,17 +14,4 @@ # limitations under the License. -SW_AGENT_JAVA_COMMIT=3f88d735ba2bfd1196aff946502447d4b14450c8 -SW_AGENT_SATELLITE_COMMIT=ea27a3f4e126a24775fe12e2aa2695bcb23d99c3 -SW_AGENT_NGINX_LUA_COMMIT=c3cee4841798a147d83b96a10914d4ac0e11d0aa -SW_AGENT_NODEJS_COMMIT=2e7560518aff846befd4d6bc815fe5e38c704a11 -SW_AGENT_GO_COMMIT=4af380c2db6243106b0fc650b6003ce3b3eb82a0 -SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58 -SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449 -SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016 -SW_KUBERNETES_COMMIT_SHA=e2c61c6774cf377b23516fca6f8a1e119d3191c5 -SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5 -SW_CTL_COMMIT=6b2eb0011e38b630db6af7203db215806bd141ed - -SW_OAP_COMMIT=e5c308d4358d5b02a658b8ad7f153aec99b8e63a -SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9 +SW_OAP_COMMIT=cec72540e712e0c9a56c18171fb3e98897b27f11 diff --git a/ui/src/components/Aside/index.vue b/ui/src/components/Aside/index.vue index 00a05663..29a8509e 100644 --- a/ui/src/components/Aside/index.vue +++ b/ui/src/components/Aside/index.vue @@ -592,8 +592,6 @@ function openEditGroup() { data.groupForm.name = data.groupLists[data.clickIndex].metadata.name data.groupForm.catalog = data.groupLists[data.clickIndex].catalog data.groupForm.shardNum = data.groupLists[data.clickIndex].resourceOpts?.shardNum - data.groupForm.blockIntervalUnit = data.groupLists[data.clickIndex].resourceOpts?.blockInterval?.unit - data.groupForm.blockIntervalNum = data.groupLists[data.clickIndex].resourceOpts?.blockInterval?.num data.groupForm.segmentIntervalUnit = data.groupLists[data.clickIndex].resourceOpts?.segmentInterval?.unit data.groupForm.segmentIntervalNum = data.groupLists[data.clickIndex].resourceOpts?.segmentInterval?.num data.groupForm.ttlUnit = data.groupLists[data.clickIndex].resourceOpts?.ttl?.unit @@ -741,10 +739,6 @@ function createGroupFunction() { catalog: data.groupForm.catalog, resourceOpts: { shardNum: data.groupForm.shardNum, - blockInterval: { - unit: data.groupForm.blockIntervalUnit, - num: data.groupForm.blockIntervalNum - }, segmentInterval: { unit: data.groupForm.segmentIntervalUnit, num: data.groupForm.segmentIntervalNum @@ -788,10 +782,6 @@ function editGroupFunction() { catalog: data.groupForm.catalog, resourceOpts: { shardNum: data.groupForm.shardNum, - blockInterval: { - unit: data.groupForm.blockIntervalUnit, - num: data.groupForm.blockIntervalNum - }, segmentInterval: { unit: data.groupForm.segmentIntervalUnit, num: data.groupForm.segmentIntervalNum