This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new ffeb4077 Add SingleIOBuf for efficient flatbuffers
serialization/deserialization (#3062)
ffeb4077 is described below
commit ffeb40772f0eac3772176519ff6aaab80ff2d91e
Author: Q1ngbo <[email protected]>
AuthorDate: Thu Aug 28 14:34:16 2025 +0800
Add SingleIOBuf for efficient flatbuffers serialization/deserialization
(#3062)
* Add SingleIOBuf for efficient flatbuffers serialization/deserialization
Co-authored-by: lishuo02<[email protected]>
Co-authored-by: xulei25 <[email protected]>
* Add SingleIOBuf for efficient flatbuffers serialization/deserialization
Co-authored-by: lishuo02<[email protected]>
Co-authored-by: xulei25 <[email protected]>
Co-authored-by: qiqingbo<[email protected]>
---------
Co-authored-by: xulei25 <[email protected]>
---
BUILD.bazel | 1 +
CMakeLists.txt | 1 +
Makefile | 1 +
src/butil/iobuf.cpp | 211 ++++-----------------------------
src/butil/iobuf.h | 4 +-
src/butil/iobuf_inl.h | 224 ++++++++++++++++++++++++++++++++++-
src/butil/single_iobuf.cpp | 285 +++++++++++++++++++++++++++++++++++++++++++++
src/butil/single_iobuf.h | 109 +++++++++++++++++
test/iobuf_unittest.cpp | 56 +++++++++
9 files changed, 704 insertions(+), 188 deletions(-)
diff --git a/BUILD.bazel b/BUILD.bazel
index d2b3d946..b4f57d94 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -226,6 +226,7 @@ BUTIL_SRCS = [
"src/butil/crc32c.cc",
"src/butil/containers/case_ignored_flat_map.cpp",
"src/butil/iobuf.cpp",
+ "src/butil/single_iobuf.cpp",
"src/butil/iobuf_profiler.cpp",
"src/butil/binary_printer.cpp",
"src/butil/recordio.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 92f7114b..45fcf611 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -475,6 +475,7 @@ set(BUTIL_SOURCES
${PROJECT_SOURCE_DIR}/src/butil/crc32c.cc
${PROJECT_SOURCE_DIR}/src/butil/containers/case_ignored_flat_map.cpp
${PROJECT_SOURCE_DIR}/src/butil/iobuf.cpp
+ ${PROJECT_SOURCE_DIR}/src/butil/single_iobuf.cpp
${PROJECT_SOURCE_DIR}/src/butil/iobuf_profiler.cpp
${PROJECT_SOURCE_DIR}/src/butil/binary_printer.cpp
${PROJECT_SOURCE_DIR}/src/butil/recordio.cc
diff --git a/Makefile b/Makefile
index fce3d82d..16a10ae2 100644
--- a/Makefile
+++ b/Makefile
@@ -163,6 +163,7 @@ BUTIL_SOURCES = \
src/butil/crc32c.cc \
src/butil/containers/case_ignored_flat_map.cpp \
src/butil/iobuf.cpp \
+ src/butil/single_iobuf.cpp \
src/butil/iobuf_profiler.cpp \
src/butil/binary_printer.cpp \
src/butil/recordio.cc \
diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp
index 208eedd2..f1f3d023 100644
--- a/src/butil/iobuf.cpp
+++ b/src/butil/iobuf.cpp
@@ -183,6 +183,20 @@ butil::static_atomic<size_t> g_nblock =
BUTIL_STATIC_ATOMIC_INIT(0);
butil::static_atomic<size_t> g_blockmem = BUTIL_STATIC_ATOMIC_INIT(0);
butil::static_atomic<size_t> g_newbigview = BUTIL_STATIC_ATOMIC_INIT(0);
+void inc_g_nblock() {
+ g_nblock.fetch_add(1, butil::memory_order_relaxed);
+}
+void dec_g_nblock() {
+ g_nblock.fetch_sub(1, butil::memory_order_relaxed);
+}
+
+void inc_g_blockmem() {
+ g_blockmem.fetch_add(1, butil::memory_order_relaxed);
+}
+void dec_g_blockmem() {
+ g_blockmem.fetch_sub(1, butil::memory_order_relaxed);
+}
+
} // namespace iobuf
size_t IOBuf::block_count() {
@@ -197,133 +211,6 @@ size_t IOBuf::new_bigview_count() {
return iobuf::g_newbigview.load(butil::memory_order_relaxed);
}
-const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 1 << 0;
-const uint16_t IOBUF_BLOCK_FLAGS_SAMPLED = 1 << 1;
-using UserDataDeleter = std::function<void(void*)>;
-
-struct UserDataExtension {
- UserDataDeleter deleter;
-};
-
-struct IOBuf::Block {
- butil::atomic<int> nshared;
- uint16_t flags;
- uint16_t abi_check; // original cap, never be zero.
- uint32_t size;
- uint32_t cap;
- // When flag is 0, portal_next is valid.
- // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data_meta is valid.
- union {
- Block* portal_next;
- uint64_t data_meta;
- } u;
- // When flag is 0, data points to `size` bytes starting at
`(char*)this+sizeof(Block)'
- // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data points to the
user data and
- // the deleter is put in UserDataExtension at `(char*)this+sizeof(Block)'
- char* data;
-
- Block(char* data_in, uint32_t data_size)
- : nshared(1)
- , flags(0)
- , abi_check(0)
- , size(0)
- , cap(data_size)
- , u({NULL})
- , data(data_in) {
- iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed);
- iobuf::g_blockmem.fetch_add(data_size + sizeof(Block),
- butil::memory_order_relaxed);
- if (is_samplable()) {
- SubmitIOBufSample(this, 1);
- }
- }
-
- Block(char* data_in, uint32_t data_size, UserDataDeleter deleter)
- : nshared(1)
- , flags(IOBUF_BLOCK_FLAGS_USER_DATA)
- , abi_check(0)
- , size(data_size)
- , cap(data_size)
- , u({0})
- , data(data_in) {
- auto ext = new (get_user_data_extension()) UserDataExtension();
- ext->deleter = std::move(deleter);
- if (is_samplable()) {
- SubmitIOBufSample(this, 1);
- }
- }
-
- // Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
- UserDataExtension* get_user_data_extension() {
- char* p = (char*)this;
- return (UserDataExtension*)(p + sizeof(Block));
- }
-
- inline void check_abi() {
-#ifndef NDEBUG
- if (abi_check != 0) {
- LOG(FATAL) << "Your program seems to wrongly contain two "
- "ABI-incompatible implementations of IOBuf";
- }
-#endif
- }
-
- void inc_ref() {
- check_abi();
- nshared.fetch_add(1, butil::memory_order_relaxed);
- if (sampled()) {
- SubmitIOBufSample(this, 1);
- }
- }
-
- void dec_ref() {
- check_abi();
- if (sampled()) {
- SubmitIOBufSample(this, -1);
- }
- if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
- butil::atomic_thread_fence(butil::memory_order_acquire);
- if (!is_user_data()) {
- iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
- iobuf::g_blockmem.fetch_sub(cap + sizeof(Block),
- butil::memory_order_relaxed);
- this->~Block();
- iobuf::blockmem_deallocate(this);
- } else if (flags & IOBUF_BLOCK_FLAGS_USER_DATA) {
- auto ext = get_user_data_extension();
- ext->deleter(data);
- ext->~UserDataExtension();
- this->~Block();
- free(this);
- }
- }
- }
-
- int ref_count() const {
- return nshared.load(butil::memory_order_relaxed);
- }
-
- bool full() const { return size >= cap; }
- size_t left_space() const { return cap - size; }
-
-private:
- bool is_samplable() {
- if (IsIOBufProfilerSamplable()) {
- flags |= IOBUF_BLOCK_FLAGS_SAMPLED;
- return true;
- }
- return false;
- }
-
- bool sampled() const {
- return flags & IOBUF_BLOCK_FLAGS_SAMPLED;
- }
-
- bool is_user_data() const {
- return flags & IOBUF_BLOCK_FLAGS_USER_DATA;
- }
-};
-
namespace iobuf {
// for unit test
@@ -341,23 +228,6 @@ uint32_t block_size(IOBuf::Block const* b) {
return b->size;
}
-inline IOBuf::Block* create_block(const size_t block_size) {
- if (block_size > 0xFFFFFFFFULL) {
- LOG(FATAL) << "block_size=" << block_size << " is too large";
- return NULL;
- }
- char* mem = (char*)iobuf::blockmem_allocate(block_size);
- if (mem == NULL) {
- return NULL;
- }
- return new (mem) IOBuf::Block(mem + sizeof(IOBuf::Block),
- block_size - sizeof(IOBuf::Block));
-}
-
-inline IOBuf::Block* create_block() {
- return create_block(IOBuf::DEFAULT_BLOCK_SIZE);
-}
-
inline IOBuf::Block* create_block_aligned(size_t block_size, size_t alignment)
{
if (block_size > 0xFFFFFFFFULL) {
LOG(FATAL) << "block_size=" << block_size << " is too large";
@@ -376,28 +246,11 @@ inline IOBuf::Block* create_block_aligned(size_t
block_size, size_t alignment) {
}
// === Share TLS blocks between appending operations ===
-// Max number of blocks in each TLS. This is a soft limit namely
-// release_tls_block_chain() may exceed this limit sometimes.
-const int MAX_BLOCKS_PER_THREAD = 8;
-
-inline int max_blocks_per_thread() {
- // If IOBufProfiler is enabled, do not cache blocks in TLS.
- return IsIOBufProfilerEnabled() ? 0 : MAX_BLOCKS_PER_THREAD;
-}
-
-struct TLSData {
- // Head of the TLS block chain.
- IOBuf::Block* block_head;
-
- // Number of TLS blocks
- int num_blocks;
-
- // True if the remote_tls_block_chain is registered to the thread.
- bool registered;
-};
static __thread TLSData g_tls_data = { NULL, 0, false };
+// Used in release_tls_block()
+TLSData* get_g_tls_data() { return &g_tls_data; }
// Used in UT
IOBuf::Block* get_tls_block_head() { return g_tls_data.block_head; }
int get_tls_block_count() { return g_tls_data.num_blocks; }
@@ -407,6 +260,14 @@ int get_tls_block_count() { return g_tls_data.num_blocks; }
// of appending functions in IOPortal may be lowered.
static butil::static_atomic<size_t> g_num_hit_tls_threshold =
BUTIL_STATIC_ATOMIC_INIT(0);
+void inc_g_num_hit_tls_threshold() {
+ g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed);
+}
+
+void dec_g_num_hit_tls_threshold() {
+ g_num_hit_tls_threshold.fetch_sub(1, butil::memory_order_relaxed);
+}
+
// Called in UT.
void remove_tls_block_chain() {
TLSData& tls_data = g_tls_data;
@@ -458,28 +319,6 @@ IOBuf::Block* share_tls_block() {
return new_block;
}
-// Return one block to TLS.
-inline void release_tls_block(IOBuf::Block* b) {
- if (!b) {
- return;
- }
- TLSData& tls_data = g_tls_data;
- if (b->full()) {
- b->dec_ref();
- } else if (tls_data.num_blocks >= max_blocks_per_thread()) {
- b->dec_ref();
- g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed);
- } else {
- b->u.portal_next = tls_data.block_head;
- tls_data.block_head = b;
- ++tls_data.num_blocks;
- if (!tls_data.registered) {
- tls_data.registered = true;
- butil::thread_atexit(remove_tls_block_chain);
- }
- }
-}
-
// Return chained blocks to TLS.
// NOTE: b MUST be non-NULL and all blocks linked SHOULD not be full.
void release_tls_block_chain(IOBuf::Block* b) {
@@ -2262,4 +2101,4 @@ bool IOBufBytesIterator::forward_one_block(const void**
data, size_t* size) {
void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n)
{
return butil::iobuf::cp(dest, src, n);
-}
+} // namespace butil
\ No newline at end of file
diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h
index c397a264..239e82d9 100644
--- a/src/butil/iobuf.h
+++ b/src/butil/iobuf.h
@@ -64,6 +64,8 @@ friend class IOBufAsZeroCopyInputStream;
friend class IOBufAsZeroCopyOutputStream;
friend class IOBufBytesIterator;
friend class IOBufCutter;
+friend class SingleIOBuf;
+
public:
static const size_t DEFAULT_BLOCK_SIZE = 8192;
static const size_t INITIAL_CAP = 32; // must be power of 2
@@ -773,4 +775,4 @@ inline void swap(butil::IOBuf& a, butil::IOBuf& b) {
#include "butil/iobuf_inl.h"
-#endif // BUTIL_IOBUF_H
+#endif // BUTIL_IOBUF_H
\ No newline at end of file
diff --git a/src/butil/iobuf_inl.h b/src/butil/iobuf_inl.h
index c49896a5..6b1f8751 100644
--- a/src/butil/iobuf_inl.h
+++ b/src/butil/iobuf_inl.h
@@ -24,10 +24,26 @@
#ifndef BUTIL_IOBUF_INL_H
#define BUTIL_IOBUF_INL_H
+#include "butil/atomicops.h" // butil::atomic
+#include "butil/thread_local.h" // thread_atexit
+#include "butil/logging.h" // CHECK, LOG
+
void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n);
namespace butil {
+using UserDataDeleter = std::function<void(void*)>;
+
+struct UserDataExtension {
+ UserDataDeleter deleter;
+};
+
+bool IsIOBufProfilerSamplable();
+void SubmitIOBufSample(IOBuf::Block* block, int64_t ref);
+
+const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 1 << 0;
+const uint16_t IOBUF_BLOCK_FLAGS_SAMPLED = 1 << 1;
+
inline ssize_t IOBuf::cut_into_file_descriptor(int fd, size_t size_hint) {
return pcut_into_file_descriptor(fd, -1, size_hint);
}
@@ -425,6 +441,212 @@ inline size_t IOBufBytesIterator::forward(size_t n) {
return nc;
}
+// Used by max_blocks_per_thread()
+bool IsIOBufProfilerEnabled();
+
+namespace iobuf {
+void inc_g_nblock();
+void dec_g_nblock();
+
+void inc_g_blockmem();
+void dec_g_blockmem();
+
+void inc_g_num_hit_tls_threshold();
+void dec_g_num_hit_tls_threshold();
+
+// Function pointers to allocate or deallocate memory for a IOBuf::Block
+extern void* (*blockmem_allocate)(size_t);
+extern void (*blockmem_deallocate)(void*);
+
+} // namespace iobuf
+
+struct IOBuf::Block {
+ butil::atomic<int> nshared;
+ uint16_t flags;
+ uint16_t abi_check; // original cap, never be zero.
+ uint32_t size;
+ uint32_t cap;
+ // When flag is 0, portal_next is valid.
+ // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data_meta is valid.
+ union {
+ Block* portal_next;
+ uint64_t data_meta;
+ } u;
+ // When flag is 0, data points to `size` bytes starting at
`(char*)this+sizeof(Block)'
+ // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data points to the
user data and
+ // the deleter is put in UserDataExtension at `(char*)this+sizeof(Block)'
+ char* data;
+
+ Block(char* data_in, uint32_t data_size)
+ : nshared(1)
+ , flags(0)
+ , abi_check(0)
+ , size(0)
+ , cap(data_size)
+ , u({NULL})
+ , data(data_in) {
+ iobuf::inc_g_nblock();
+ iobuf::inc_g_blockmem();
+ if (is_samplable()) {
+ SubmitIOBufSample(this, 1);
+ }
+ }
+
+ Block(char* data_in, uint32_t data_size, UserDataDeleter deleter)
+ : nshared(1)
+ , flags(IOBUF_BLOCK_FLAGS_USER_DATA)
+ , abi_check(0)
+ , size(data_size)
+ , cap(data_size)
+ , u({0})
+ , data(data_in) {
+ auto ext = new (get_user_data_extension()) UserDataExtension();
+ ext->deleter = std::move(deleter);
+ if (is_samplable()) {
+ SubmitIOBufSample(this, 1);
+ }
+ }
+
+ // Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
+ UserDataExtension* get_user_data_extension() {
+ char* p = (char*)this;
+ return (UserDataExtension*)(p + sizeof(Block));
+ }
+
+ inline void check_abi() {
+#ifndef NDEBUG
+ if (abi_check != 0) {
+ LOG(FATAL) << "Your program seems to wrongly contain two "
+ "ABI-incompatible implementations of IOBuf";
+ }
+#endif
+}
+
+ void inc_ref() {
+ check_abi();
+ nshared.fetch_add(1, butil::memory_order_relaxed);
+ if (sampled()) {
+ SubmitIOBufSample(this, 1);
+ }
+ }
+
+ void dec_ref() {
+ check_abi();
+ if (sampled()) {
+ SubmitIOBufSample(this, -1);
+ }
+ if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
+ butil::atomic_thread_fence(butil::memory_order_acquire);
+ if (!is_user_data()) {
+ iobuf::dec_g_nblock();
+ iobuf::dec_g_blockmem();
+ this->~Block();
+ iobuf::blockmem_deallocate(this);
+ } else if (flags & IOBUF_BLOCK_FLAGS_USER_DATA) {
+ auto ext = get_user_data_extension();
+ ext->deleter(data);
+ ext->~UserDataExtension();
+ this->~Block();
+ free(this);
+ }
+ }
+ }
+
+ int ref_count() const {
+ return nshared.load(butil::memory_order_relaxed);
+ }
+
+ bool full() const { return size >= cap; }
+ size_t left_space() const { return cap - size; }
+
+private:
+ bool is_samplable() {
+ if (IsIOBufProfilerSamplable()) {
+ flags |= IOBUF_BLOCK_FLAGS_SAMPLED;
+ return true;
+ }
+ return false;
+ }
+
+ bool sampled() const {
+ return flags & IOBUF_BLOCK_FLAGS_SAMPLED;
+ }
+
+ bool is_user_data() const {
+ return flags & IOBUF_BLOCK_FLAGS_USER_DATA;
+ }
+};
+
+namespace iobuf {
+struct TLSData {
+ // Head of the TLS block chain.
+ IOBuf::Block* block_head;
+
+ // Number of TLS blocks
+ int num_blocks;
+
+ // True if the remote_tls_block_chain is registered to the thread.
+ bool registered;
+};
+
+// Max number of blocks in each TLS. This is a soft limit namely
+// release_tls_block_chain() may exceed this limit sometimes.
+const int MAX_BLOCKS_PER_THREAD = 8;
+
+inline int max_blocks_per_thread() {
+ // If IOBufProfiler is enabled, do not cache blocks in TLS.
+ return IsIOBufProfilerEnabled() ? 0 : MAX_BLOCKS_PER_THREAD;
+}
+
+TLSData* get_g_tls_data();
+void remove_tls_block_chain();
+
+IOBuf::Block* acquire_tls_block();
+
+// Return one block to TLS.
+inline void release_tls_block(IOBuf::Block* b) {
+ if (!b) {
+ return;
+ }
+ TLSData *tls_data = get_g_tls_data();
+ if (b->full()) {
+ b->dec_ref();
+ } else if (tls_data->num_blocks >= max_blocks_per_thread()) {
+ b->dec_ref();
+ // g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed);
+ inc_g_num_hit_tls_threshold();
+ } else {
+ b->u.portal_next = tls_data->block_head;
+ tls_data->block_head = b;
+ ++tls_data->num_blocks;
+ if (!tls_data->registered) {
+ tls_data->registered = true;
+ butil::thread_atexit(remove_tls_block_chain);
+ }
+ }
+}
+
+inline IOBuf::Block* create_block(const size_t block_size) {
+ if (block_size > 0xFFFFFFFFULL) {
+ LOG(FATAL) << "block_size=" << block_size << " is too large";
+ return NULL;
+ }
+ char* mem = (char*)iobuf::blockmem_allocate(block_size);
+ if (mem == NULL) {
+ return NULL;
+ }
+ return new (mem) IOBuf::Block(mem + sizeof(IOBuf::Block),
+ block_size - sizeof(IOBuf::Block));
+}
+
+inline IOBuf::Block* create_block() {
+ return create_block(IOBuf::DEFAULT_BLOCK_SIZE);
+}
+
+void* cp(void *__restrict dest, const void *__restrict src, size_t n);
+
+}; // namespace iobuf;
+
} // namespace butil
-#endif // BUTIL_IOBUF_INL_H
+#endif // BUTIL_IOBUF_INL_H
\ No newline at end of file
diff --git a/src/butil/single_iobuf.cpp b/src/butil/single_iobuf.cpp
new file mode 100644
index 00000000..c51e8fff
--- /dev/null
+++ b/src/butil/single_iobuf.cpp
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// SingleIOBuf - A continuous zero-copied buffer
+
+#include "butil/logging.h" // CHECK, LOG
+#include "butil/iobuf.h"
+#include "butil/iobuf_inl.h"
+#include "butil/single_iobuf.h"
+
+namespace butil {
+
+SingleIOBuf::SingleIOBuf()
+ : _cur_block(NULL)
+ , _block_size(0) {
+ _cur_ref.offset = 0;
+ _cur_ref.length = 0;
+ _cur_ref.block = NULL;
+}
+
+SingleIOBuf::SingleIOBuf(const IOBuf::BlockRef& ref) {
+ _cur_block = NULL;
+ _block_size = 0;
+ if (ref.block) {
+ _cur_ref = ref;
+ _cur_ref.block->inc_ref();
+ }
+}
+
+SingleIOBuf::SingleIOBuf(const SingleIOBuf& other) {
+ _cur_block = NULL;
+ _block_size = 0;
+ if (other._cur_ref.block != NULL) {
+ _cur_ref = other._cur_ref;
+ _cur_ref.block->inc_ref();
+ }
+}
+
+SingleIOBuf::~SingleIOBuf() {
+ reset();
+}
+
+SingleIOBuf& SingleIOBuf::operator=(const SingleIOBuf& rhs) {
+ reset();
+ _block_size = 0;
+ if (rhs._cur_ref.block != NULL) {
+ _cur_ref = rhs._cur_ref;
+ _cur_ref.block->inc_ref();
+ }
+ return *this;
+}
+
+void SingleIOBuf::swap(SingleIOBuf& other) {
+ if (this == &other) {
+ return;
+ }
+
+ IOBuf::BlockRef tmp_ref = _cur_ref;
+ _cur_ref = other._cur_ref;
+ other._cur_ref = tmp_ref;
+ IOBuf::Block* tmp_ptr = other._cur_block;
+ other._cur_block = _cur_block;
+ _cur_block = tmp_ptr;
+ uint32_t tmp_size = other._block_size;
+ other._block_size = _block_size;
+ _block_size = tmp_size;
+}
+
+void* SingleIOBuf::allocate(uint32_t size) {
+ IOBuf::Block* b = alloc_block_by_size(size);
+ if (!b) {
+ return NULL;
+ }
+ _cur_ref.offset = b->size;
+ _cur_ref.length = size;
+ _cur_ref.block = b;
+ b->size += size;
+ b->inc_ref();
+ return b->data + _cur_ref.offset;
+}
+
+void SingleIOBuf::deallocate(void* p) {
+ if (_cur_ref.block) {
+ if (_cur_ref.block->data + _cur_ref.offset == (char*)p) {
+ reset();
+ }
+ }
+}
+
+IOBuf::Block* SingleIOBuf::alloc_block_by_size(uint32_t data_size) {
+ if (_cur_block != NULL) {
+ if (_cur_block->left_space() >= data_size) {
+ return _cur_block;
+ } else {
+ _cur_block->dec_ref();
+ _cur_block = NULL;
+ }
+ }
+ uint32_t total_size = data_size + sizeof(IOBuf::Block);
+ if (total_size <= IOBuf::DEFAULT_BLOCK_SIZE) {
+ _cur_block = iobuf::acquire_tls_block();
+ if (_cur_block != NULL) {
+ if (_cur_block->left_space() >= data_size) {
+ return _cur_block;
+ } else {
+ _cur_block->dec_ref();
+ _cur_block = NULL;
+ }
+ }
+ _cur_block = iobuf::create_block();
+ } else {
+ _cur_block = iobuf::create_block(total_size);
+ _block_size = total_size;
+ }
+ if (BAIDU_UNLIKELY(!_cur_block)) {
+ errno = ENOMEM;
+ _block_size = 0;
+ return NULL;
+ }
+ return _cur_block;
+}
+
+void SingleIOBuf::memcpy_downward(void* old_p, uint32_t old_size,
+ void* new_p, uint32_t new_size,
+ uint32_t in_use_back, uint32_t in_use_front) {
+ memcpy((u_char*)new_p + new_size - in_use_back, (u_char*)old_p + old_size
- in_use_back,
+ in_use_back);
+ memcpy(new_p, old_p, in_use_front);
+}
+
+void* SingleIOBuf::reallocate_downward(uint32_t new_size, uint32_t in_use_back,
+ uint32_t in_use_front) {
+ IOBuf::BlockRef& ref = _cur_ref;
+ if (BAIDU_UNLIKELY(new_size <= ref.length)) {
+ LOG(ERROR) << "invalid new size:" << new_size;
+ errno = EINVAL;
+ return NULL;
+ }
+ if (BAIDU_UNLIKELY(ref.block == NULL)) {
+ LOG(ERROR) << "SingleIOBuf reallocate_downward failed. Block cannot be
null!";
+ errno = EINVAL;
+ return NULL;
+ }
+ char* old_p = ref.block->data + ref.offset;
+ uint32_t old_size = ref.length;
+ IOBuf::Block* b = alloc_block_by_size(new_size);
+ if (!b) {
+ return NULL;
+ }
+ char* new_p = b->data + b->size;
+ memcpy_downward(old_p, old_size,
+ new_p, new_size,
+ in_use_back, in_use_front);
+ ref.block->dec_ref();
+ _cur_ref.offset = b->size;
+ _cur_ref.length = new_size;
+ _cur_ref.block = b;
+ b->size += new_size;
+ b->inc_ref();
+ return new_p;
+}
+
+const void* SingleIOBuf::get_begin() const {
+ if (_cur_ref.block) {
+ return _cur_ref.block->data + _cur_ref.offset;
+ }
+ return NULL;
+}
+
+uint32_t SingleIOBuf::get_length() const {
+ return _cur_ref.length;
+}
+
+void SingleIOBuf::reset() {
+ if (_cur_block) {
+ if (_block_size == 0) {
+ iobuf::release_tls_block(_cur_block);
+ } else {
+ _cur_block->dec_ref();
+ _block_size = 0;
+ }
+ _cur_block = NULL;
+ }
+ if (_cur_ref.block != NULL) {
+ _cur_ref.block->dec_ref();
+ }
+ _cur_ref.offset = 0;
+ _cur_ref.length = 0;
+ _cur_ref.block = NULL;
+}
+
+bool SingleIOBuf::assign(const IOBuf& buf, uint32_t msg_size) {
+ if (BAIDU_UNLIKELY(buf.length() < msg_size)) {
+ LOG(ERROR) << "Fail to dump_mult_iobuf msg_size:" << msg_size
+ << " from source iobuf of size:" << buf.length();
+ return false;
+ }
+ if (BAIDU_UNLIKELY(msg_size == 0)) {
+ return true;
+ }
+
+ const IOBuf::BlockRef& ref = buf._front_ref();
+ if (ref.length >= msg_size) {
+ reset();
+ _cur_ref.offset = ref.offset;
+ _cur_ref.length = msg_size;
+ _cur_ref.block = ref.block;
+ _cur_ref.block->inc_ref();
+ return true;
+ } else {
+ IOBuf::Block* b = alloc_block_by_size(msg_size);
+ if (!b) {
+ return false;
+ }
+ reset();
+ char* out = b->data + b->size;
+ const size_t nref = buf.backing_block_num();
+ uint32_t last_len = msg_size;
+ for (size_t i = 0; i < nref && last_len > 0; ++i) {
+ const IOBuf::BlockRef& r = buf._ref_at(i);
+ uint32_t n = std::min(r.length, last_len);
+ iobuf::cp(out, r.block->data + r.offset, n);
+ last_len -= n;
+ out += n;
+ }
+ _cur_ref.offset = b->size;
+ _cur_ref.length = msg_size;
+ _cur_ref.block = b;
+ b->size += msg_size;
+ _cur_ref.block->inc_ref();
+ return true;
+ }
+}
+
+void SingleIOBuf::append_to(IOBuf* buf) const {
+ if (buf && _cur_ref.block) {
+ buf->_push_back_ref(_cur_ref);
+ }
+}
+
+int SingleIOBuf::assign_user_data(void* data, size_t size,
std::function<void(void*)> deleter) {
+ if (size > 0xFFFFFFFFULL - 100) {
+ LOG(FATAL) << "data_size=" << size << " is too large";
+ return -1;
+ }
+ char* mem = (char*)malloc(sizeof(IOBuf::Block) +
sizeof(UserDataExtension));
+ if (mem == NULL) {
+ return -1;
+ }
+ if (deleter == NULL) {
+ deleter = ::free;
+ }
+ reset();
+ IOBuf::Block* b = new (mem) IOBuf::Block((char*)data, size, deleter);
+ _cur_ref.offset = 0;
+ _cur_ref.length = b->cap;
+ _cur_ref.block = b;
+ return 0;
+}
+
+void SingleIOBuf::target_block_inc_ref(void* b) {
+ IOBuf::Block* block = (IOBuf::Block*)b;
+ block->inc_ref();
+}
+
+void SingleIOBuf::target_block_dec_ref(void* b) {
+ IOBuf::Block* block = (IOBuf::Block*)b;
+ block->dec_ref();
+}
+
+} // namespace butil
\ No newline at end of file
diff --git a/src/butil/single_iobuf.h b/src/butil/single_iobuf.h
new file mode 100644
index 00000000..9c4237aa
--- /dev/null
+++ b/src/butil/single_iobuf.h
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// SingleIOBuf - A continuous zero-copied buffer
+
+#ifndef BUTIL_SINGLE_IOBUF_H
+#define BUTIL_SINGLE_IOBUF_H
+
+#include "butil/iobuf.h"
+#include "butil/iobuf_inl.h"
+
+namespace butil {
+
+// SingleIOBuf is a lightweight buffer that manages a single IOBuf::Block.
+// It always ensures that the underlying memory is contiguous and
+// avoids unnecessary memory copies.
+// It is primarily used to efficiently serialize and deserialize
+// RPC requests in flatbuffers.
+class SingleIOBuf {
+public:
+ SingleIOBuf();
+ ~SingleIOBuf();
+ SingleIOBuf(const IOBuf::BlockRef& ref);
+ SingleIOBuf(const SingleIOBuf& other);
+ SingleIOBuf& operator=(const SingleIOBuf& rhs);
+ void swap(SingleIOBuf& other);
+
+ // Allocates a contiguous memory region of the specified size.
+ // Returns a pointer to the start of allocated space within the block.
+ void* allocate(uint32_t size);
+ // Deallocates the block if the given pointer matches its starting address.
+ void deallocate(void* p);
+
+ // Reallocates the current buffer to a larger size.
+ // in_use_back indicates the memory used by message data.
+ // in_use_front indicates the memory used by metadata.
+ void* reallocate_downward(uint32_t new_size,
+ uint32_t in_use_back,
+ uint32_t in_use_front);
+
+ // Returns a pointer to the beginning of the current buffer data.
+ const void* get_begin() const;
+
+ // Get the length of the SingleIOBuf.
+ uint32_t get_length() const;
+
+ // Reset the SingleIOBuf by release the block and clear the BlockRef.
+ void reset();
+
+ // Assigns data from the given IOBuf to this SingleIOBuf.
+ // If the source contains multiple BlockRef segments,
+ // they will be concatenated into a single contiguous block.
+ bool assign(const IOBuf& buf, uint32_t msg_size);
+
+ // Appends the current block of the SingleIOBuf to the given IOBuf.
+ void append_to(IOBuf* buf) const;
+
+ const IOBuf::BlockRef& get_cur_ref() const { return _cur_ref; }
+ void* get_cur_block() { return (void*)_cur_ref.block; }
+
+ // Returns the number of underlying blocks in the SingleIOBuf,
+ // which is either 1 if a block exists or 0 if none.
+ size_t backing_block_num() const { return _cur_ref.block != NULL ? 1 : 0; }
+
+ // Assigns user date to the SingleIOBuf,
+ // updates _cur_ref to point to the block storing the data.
+ int assign_user_data(void* data, size_t size, std::function<void(void*)>
deleter);
+
+ // Increments the reference count of the specified target block.
+ static void target_block_inc_ref(void* b);
+ // Decrements the reference count of the specified target block.
+ static void target_block_dec_ref(void* b);
+
+protected:
+ // Copy from the old buffer to the corresponding positions in the new
block.
+ void memcpy_downward(void* old_p, uint32_t old_size,
+ void* new_p, uint32_t new_size,
+ uint32_t in_use_back, uint32_t in_use_front);
+
+ // Allocates and returns a memory block large enough to hold the specified
data size,
+ // reusing an existing block when possible or creating a new one if
necessary.
+ IOBuf::Block* alloc_block_by_size(uint32_t data_size);
+
+private:
+ // Current block the SingleIOBuf used to allocate memory.
+ IOBuf::Block* _cur_block;
+ // Current block total size, include sizeof(IOBuf::Block).
+ uint32_t _block_size;
+ // Point to the block storing the data.
+ IOBuf::BlockRef _cur_ref;
+};
+
+} // namespace butil
+
+#endif
\ No newline at end of file
diff --git a/test/iobuf_unittest.cpp b/test/iobuf_unittest.cpp
index a919cbda..679cdfe7 100644
--- a/test/iobuf_unittest.cpp
+++ b/test/iobuf_unittest.cpp
@@ -29,6 +29,7 @@
#include <butil/time.h> // Timer
#include <butil/fd_utility.h> // make_non_blocking
#include <butil/iobuf.h>
+#include <butil/single_iobuf.h>
#include <butil/logging.h>
#include <butil/fd_guard.h>
#include <butil/errno.h>
@@ -1890,4 +1891,59 @@ TEST_F(IOBufTest, reserve_aligned) {
}
}
+TEST_F(IOBufTest, single_iobuf) {
+ butil::IOBuf buf1;
+ // It will be freed by IOBuf.
+ char *usr_str = (char *)malloc(16);
+ memset(usr_str, 0, 16);
+ char src_str[] = "abcdefgh12345678";
+ size_t total_len = sizeof(src_str);
+ strncpy(usr_str, src_str + 8, total_len - 8);
+ buf1.append(src_str, 8);
+ buf1.append_user_data(usr_str, total_len - 8, NULL);
+ ASSERT_EQ(2, buf1.backing_block_num());
+ butil::SingleIOBuf sbuf;
+ ASSERT_EQ(0, sbuf.backing_block_num());
+ sbuf.assign(buf1, total_len);
+ ASSERT_EQ(1, sbuf.backing_block_num());
+ size_t s_len = sbuf.get_length();
+ ASSERT_EQ(s_len, total_len);
+ const char* str = (const char*) sbuf.get_begin();
+ int ret = strcmp(str, src_str);
+ ASSERT_EQ(0, ret);
+ butil::IOBuf buf2;
+ sbuf.append_to(&buf2);
+ ASSERT_EQ(buf2.length(), total_len);
+ butil::SingleIOBuf sbuf2;
+ sbuf2.swap(sbuf);
+ ASSERT_EQ(sbuf.get_length(), 0);
+ ASSERT_EQ(sbuf2.get_length(), total_len);
+ sbuf2.reset();
+ ASSERT_EQ(0, sbuf2.get_length());
+
+ void* buf = sbuf.allocate(1024);
+ ASSERT_TRUE(NULL != buf);
+ buf = sbuf.reallocate_downward(16384, 0, 0);
+ ASSERT_TRUE(NULL != buf);
+ s_len = sbuf.get_length();
+ ASSERT_EQ(16384, s_len);
+
+ butil::IOBuf::BlockRef ref = sbuf.get_cur_ref();
+ butil::SingleIOBuf sbuf3(ref);
+ s_len = sbuf3.get_length();
+ ASSERT_EQ(16384, s_len);
+ sbuf.deallocate(buf);
+
+ errno = 0;
+ void *null_buf = sbuf3.reallocate_downward(s_len - 1, 0, 0);
+ ASSERT_EQ(null_buf, nullptr);
+
+ uint32_t old_size = sbuf3.get_length();
+ void *p = sbuf3.reallocate_downward(old_size + 16, 0, old_size);
+ ASSERT_TRUE(p != nullptr);
+ old_size = sbuf3.get_length();
+ p = sbuf3.reallocate_downward(old_size + 16, old_size, 0);
+ ASSERT_TRUE(p != nullptr);
+}
+
} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]