This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new ffeb4077 Add SingleIOBuf for efficient flatbuffers 
serialization/deserialization (#3062)
ffeb4077 is described below

commit ffeb40772f0eac3772176519ff6aaab80ff2d91e
Author: Q1ngbo <[email protected]>
AuthorDate: Thu Aug 28 14:34:16 2025 +0800

    Add SingleIOBuf for efficient flatbuffers serialization/deserialization 
(#3062)
    
    * Add SingleIOBuf for efficient flatbuffers serialization/deserialization
    
    Co-authored-by: lishuo02<[email protected]>
    Co-authored-by: xulei25 <[email protected]>
    
    * Add SingleIOBuf for efficient flatbuffers serialization/deserialization
    
    Co-authored-by: lishuo02<[email protected]>
    Co-authored-by: xulei25 <[email protected]>
    Co-authored-by: qiqingbo<[email protected]>
    
    ---------
    
    Co-authored-by: xulei25 <[email protected]>
---
 BUILD.bazel                |   1 +
 CMakeLists.txt             |   1 +
 Makefile                   |   1 +
 src/butil/iobuf.cpp        | 211 ++++-----------------------------
 src/butil/iobuf.h          |   4 +-
 src/butil/iobuf_inl.h      | 224 ++++++++++++++++++++++++++++++++++-
 src/butil/single_iobuf.cpp | 285 +++++++++++++++++++++++++++++++++++++++++++++
 src/butil/single_iobuf.h   | 109 +++++++++++++++++
 test/iobuf_unittest.cpp    |  56 +++++++++
 9 files changed, 704 insertions(+), 188 deletions(-)

diff --git a/BUILD.bazel b/BUILD.bazel
index d2b3d946..b4f57d94 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -226,6 +226,7 @@ BUTIL_SRCS = [
     "src/butil/crc32c.cc",
     "src/butil/containers/case_ignored_flat_map.cpp",
     "src/butil/iobuf.cpp",
+    "src/butil/single_iobuf.cpp",
     "src/butil/iobuf_profiler.cpp",
     "src/butil/binary_printer.cpp",
     "src/butil/recordio.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 92f7114b..45fcf611 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -475,6 +475,7 @@ set(BUTIL_SOURCES
     ${PROJECT_SOURCE_DIR}/src/butil/crc32c.cc
     ${PROJECT_SOURCE_DIR}/src/butil/containers/case_ignored_flat_map.cpp
     ${PROJECT_SOURCE_DIR}/src/butil/iobuf.cpp
+    ${PROJECT_SOURCE_DIR}/src/butil/single_iobuf.cpp
     ${PROJECT_SOURCE_DIR}/src/butil/iobuf_profiler.cpp
     ${PROJECT_SOURCE_DIR}/src/butil/binary_printer.cpp
     ${PROJECT_SOURCE_DIR}/src/butil/recordio.cc
diff --git a/Makefile b/Makefile
index fce3d82d..16a10ae2 100644
--- a/Makefile
+++ b/Makefile
@@ -163,6 +163,7 @@ BUTIL_SOURCES = \
     src/butil/crc32c.cc \
     src/butil/containers/case_ignored_flat_map.cpp \
     src/butil/iobuf.cpp \
+    src/butil/single_iobuf.cpp \
     src/butil/iobuf_profiler.cpp \
     src/butil/binary_printer.cpp \
     src/butil/recordio.cc \
diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp
index 208eedd2..f1f3d023 100644
--- a/src/butil/iobuf.cpp
+++ b/src/butil/iobuf.cpp
@@ -183,6 +183,20 @@ butil::static_atomic<size_t> g_nblock = 
BUTIL_STATIC_ATOMIC_INIT(0);
 butil::static_atomic<size_t> g_blockmem = BUTIL_STATIC_ATOMIC_INIT(0);
 butil::static_atomic<size_t> g_newbigview = BUTIL_STATIC_ATOMIC_INIT(0);
 
+void inc_g_nblock() {
+    g_nblock.fetch_add(1, butil::memory_order_relaxed);
+}
+void dec_g_nblock() {
+    g_nblock.fetch_sub(1, butil::memory_order_relaxed);
+}
+
+void inc_g_blockmem() {
+    g_blockmem.fetch_add(1, butil::memory_order_relaxed);
+}
+void dec_g_blockmem() {
+    g_blockmem.fetch_sub(1, butil::memory_order_relaxed);
+}
+
 }  // namespace iobuf
 
 size_t IOBuf::block_count() {
@@ -197,133 +211,6 @@ size_t IOBuf::new_bigview_count() {
     return iobuf::g_newbigview.load(butil::memory_order_relaxed);
 }
 
-const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 1 << 0;
-const uint16_t IOBUF_BLOCK_FLAGS_SAMPLED = 1 << 1;
-using UserDataDeleter = std::function<void(void*)>;
-
-struct UserDataExtension {
-    UserDataDeleter deleter;
-};
-
-struct IOBuf::Block {
-    butil::atomic<int> nshared;
-    uint16_t flags;
-    uint16_t abi_check;  // original cap, never be zero.
-    uint32_t size;
-    uint32_t cap;
-    // When flag is 0, portal_next is valid.
-    // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data_meta is valid.
-    union {
-        Block* portal_next;
-        uint64_t data_meta;
-    } u;
-    // When flag is 0, data points to `size` bytes starting at 
`(char*)this+sizeof(Block)'
-    // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data points to the 
user data and
-    // the deleter is put in UserDataExtension at `(char*)this+sizeof(Block)'
-    char* data;
-        
-    Block(char* data_in, uint32_t data_size)
-        : nshared(1)
-        , flags(0)
-        , abi_check(0)
-        , size(0)
-        , cap(data_size)
-        , u({NULL})
-        , data(data_in) {
-        iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed);
-        iobuf::g_blockmem.fetch_add(data_size + sizeof(Block),
-                                    butil::memory_order_relaxed);
-        if (is_samplable()) {
-            SubmitIOBufSample(this, 1);
-        }
-    }
-
-    Block(char* data_in, uint32_t data_size, UserDataDeleter deleter)
-        : nshared(1)
-        , flags(IOBUF_BLOCK_FLAGS_USER_DATA)
-        , abi_check(0)
-        , size(data_size)
-        , cap(data_size)
-        , u({0})
-        , data(data_in) {
-        auto ext = new (get_user_data_extension()) UserDataExtension();
-        ext->deleter = std::move(deleter);
-        if (is_samplable()) {
-            SubmitIOBufSample(this, 1);
-        }
-    }
-
-    // Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
-    UserDataExtension* get_user_data_extension() {
-        char* p = (char*)this;
-        return (UserDataExtension*)(p + sizeof(Block));
-    }
-
-    inline void check_abi() {
-#ifndef NDEBUG
-        if (abi_check != 0) {
-            LOG(FATAL) << "Your program seems to wrongly contain two "
-                "ABI-incompatible implementations of IOBuf";
-        }
-#endif
-    }
-
-    void inc_ref() {
-        check_abi();
-        nshared.fetch_add(1, butil::memory_order_relaxed);
-        if (sampled()) {
-            SubmitIOBufSample(this, 1);
-        }
-    }
-        
-    void dec_ref() {
-        check_abi();
-        if (sampled()) {
-            SubmitIOBufSample(this, -1);
-        }
-        if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
-            butil::atomic_thread_fence(butil::memory_order_acquire);
-            if (!is_user_data()) {
-                iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
-                iobuf::g_blockmem.fetch_sub(cap + sizeof(Block),
-                                            butil::memory_order_relaxed);
-                this->~Block();
-                iobuf::blockmem_deallocate(this);
-            } else if (flags & IOBUF_BLOCK_FLAGS_USER_DATA) {
-                auto ext = get_user_data_extension();
-                ext->deleter(data);
-                ext->~UserDataExtension();
-                this->~Block();
-                free(this);
-            }
-        }
-    }
-
-    int ref_count() const {
-        return nshared.load(butil::memory_order_relaxed);
-    }
-
-    bool full() const { return size >= cap; }
-    size_t left_space() const { return cap - size; }
-
-private:
-    bool is_samplable() {
-        if (IsIOBufProfilerSamplable()) {
-            flags |= IOBUF_BLOCK_FLAGS_SAMPLED;
-            return true;
-        }
-        return false;
-    }
-
-    bool sampled() const {
-        return flags & IOBUF_BLOCK_FLAGS_SAMPLED;
-    }
-
-    bool is_user_data() const {
-        return flags & IOBUF_BLOCK_FLAGS_USER_DATA;
-    }
-};
-
 namespace iobuf {
 
 // for unit test
@@ -341,23 +228,6 @@ uint32_t block_size(IOBuf::Block const* b) {
     return b->size;
 }
 
-inline IOBuf::Block* create_block(const size_t block_size) {
-    if (block_size > 0xFFFFFFFFULL) {
-        LOG(FATAL) << "block_size=" << block_size << " is too large";
-        return NULL;
-    }
-    char* mem = (char*)iobuf::blockmem_allocate(block_size);
-    if (mem == NULL) {
-        return NULL;
-    }
-    return new (mem) IOBuf::Block(mem + sizeof(IOBuf::Block),
-                                  block_size - sizeof(IOBuf::Block));
-}
-
-inline IOBuf::Block* create_block() {
-    return create_block(IOBuf::DEFAULT_BLOCK_SIZE);
-}
-
 inline IOBuf::Block* create_block_aligned(size_t block_size, size_t alignment) 
{
     if (block_size > 0xFFFFFFFFULL) {
         LOG(FATAL) << "block_size=" << block_size << " is too large";
@@ -376,28 +246,11 @@ inline IOBuf::Block* create_block_aligned(size_t 
block_size, size_t alignment) {
 }
 
 // === Share TLS blocks between appending operations ===
-// Max number of blocks in each TLS. This is a soft limit namely
-// release_tls_block_chain() may exceed this limit sometimes.
-const int MAX_BLOCKS_PER_THREAD = 8;
-
-inline int max_blocks_per_thread() {
-    // If IOBufProfiler is enabled, do not cache blocks in TLS.
-    return IsIOBufProfilerEnabled() ? 0 : MAX_BLOCKS_PER_THREAD;
-}
-
-struct TLSData {
-    // Head of the TLS block chain.
-    IOBuf::Block* block_head;
-    
-    // Number of TLS blocks
-    int num_blocks;
-    
-    // True if the remote_tls_block_chain is registered to the thread.
-    bool registered;
-};
 
 static __thread TLSData g_tls_data = { NULL, 0, false };
 
+// Used in release_tls_block()
+TLSData* get_g_tls_data() { return &g_tls_data; }
 // Used in UT
 IOBuf::Block* get_tls_block_head() { return g_tls_data.block_head; }
 int get_tls_block_count() { return g_tls_data.num_blocks; }
@@ -407,6 +260,14 @@ int get_tls_block_count() { return g_tls_data.num_blocks; }
 // of appending functions in IOPortal may be lowered.
 static butil::static_atomic<size_t> g_num_hit_tls_threshold = 
BUTIL_STATIC_ATOMIC_INIT(0);
 
+void inc_g_num_hit_tls_threshold() {
+    g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed);
+}
+
+void dec_g_num_hit_tls_threshold() {
+    g_num_hit_tls_threshold.fetch_sub(1, butil::memory_order_relaxed);
+}
+
 // Called in UT.
 void remove_tls_block_chain() {
     TLSData& tls_data = g_tls_data;
@@ -458,28 +319,6 @@ IOBuf::Block* share_tls_block() {
     return new_block;
 }
 
-// Return one block to TLS.
-inline void release_tls_block(IOBuf::Block* b) {
-    if (!b) {
-        return;
-    }
-    TLSData& tls_data = g_tls_data;
-    if (b->full()) {
-        b->dec_ref();
-    } else if (tls_data.num_blocks >= max_blocks_per_thread()) {
-        b->dec_ref();
-        g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed);
-    } else {
-        b->u.portal_next = tls_data.block_head;
-        tls_data.block_head = b;
-        ++tls_data.num_blocks;
-        if (!tls_data.registered) {
-            tls_data.registered = true;
-            butil::thread_atexit(remove_tls_block_chain);
-        }
-    }
-}
-
 // Return chained blocks to TLS.
 // NOTE: b MUST be non-NULL and all blocks linked SHOULD not be full.
 void release_tls_block_chain(IOBuf::Block* b) {
@@ -2262,4 +2101,4 @@ bool IOBufBytesIterator::forward_one_block(const void** 
data, size_t* size) {
 
 void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n) 
{
     return butil::iobuf::cp(dest, src, n);
-}
+} // namespace butil
\ No newline at end of file
diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h
index c397a264..239e82d9 100644
--- a/src/butil/iobuf.h
+++ b/src/butil/iobuf.h
@@ -64,6 +64,8 @@ friend class IOBufAsZeroCopyInputStream;
 friend class IOBufAsZeroCopyOutputStream;
 friend class IOBufBytesIterator;
 friend class IOBufCutter;
+friend class SingleIOBuf;
+
 public:
     static const size_t DEFAULT_BLOCK_SIZE = 8192;
     static const size_t INITIAL_CAP = 32; // must be power of 2
@@ -773,4 +775,4 @@ inline void swap(butil::IOBuf& a, butil::IOBuf& b) {
 
 #include "butil/iobuf_inl.h"
 
-#endif  // BUTIL_IOBUF_H
+#endif  // BUTIL_IOBUF_H
\ No newline at end of file
diff --git a/src/butil/iobuf_inl.h b/src/butil/iobuf_inl.h
index c49896a5..6b1f8751 100644
--- a/src/butil/iobuf_inl.h
+++ b/src/butil/iobuf_inl.h
@@ -24,10 +24,26 @@
 #ifndef BUTIL_IOBUF_INL_H
 #define BUTIL_IOBUF_INL_H
 
+#include "butil/atomicops.h"                // butil::atomic
+#include "butil/thread_local.h"             // thread_atexit
+#include "butil/logging.h"                  // CHECK, LOG
+
 void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n);
 
 namespace butil {
 
+using UserDataDeleter = std::function<void(void*)>;
+
+struct UserDataExtension {
+    UserDataDeleter deleter;
+};
+
+bool IsIOBufProfilerSamplable();
+void SubmitIOBufSample(IOBuf::Block* block, int64_t ref);
+
+const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 1 << 0;
+const uint16_t IOBUF_BLOCK_FLAGS_SAMPLED = 1 << 1;
+
 inline ssize_t IOBuf::cut_into_file_descriptor(int fd, size_t size_hint) {
     return pcut_into_file_descriptor(fd, -1, size_hint);
 }
@@ -425,6 +441,212 @@ inline size_t IOBufBytesIterator::forward(size_t n) {
     return nc;
 }
 
+// Used by max_blocks_per_thread()
+bool IsIOBufProfilerEnabled();
+
+namespace iobuf {
+void inc_g_nblock();
+void dec_g_nblock();
+
+void inc_g_blockmem();
+void dec_g_blockmem();
+
+void inc_g_num_hit_tls_threshold();
+void dec_g_num_hit_tls_threshold();
+
+// Function pointers to allocate or deallocate memory for a IOBuf::Block
+extern void* (*blockmem_allocate)(size_t);
+extern void  (*blockmem_deallocate)(void*);
+
+} // namespace iobuf
+
+struct IOBuf::Block {
+    butil::atomic<int> nshared;
+    uint16_t flags;
+    uint16_t abi_check;  // original cap, never be zero.
+    uint32_t size;
+    uint32_t cap;
+    // When flag is 0, portal_next is valid.
+    // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data_meta is valid.
+    union {
+        Block* portal_next;
+        uint64_t data_meta;
+    } u;
+    // When flag is 0, data points to `size` bytes starting at 
`(char*)this+sizeof(Block)'
+    // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data points to the 
user data and
+    // the deleter is put in UserDataExtension at `(char*)this+sizeof(Block)'
+    char* data;
+        
+    Block(char* data_in, uint32_t data_size)
+        : nshared(1)
+        , flags(0)
+        , abi_check(0)
+        , size(0)
+        , cap(data_size)
+        , u({NULL})
+        , data(data_in) {
+        iobuf::inc_g_nblock();
+        iobuf::inc_g_blockmem();
+        if (is_samplable()) {
+            SubmitIOBufSample(this, 1);
+        }
+    }
+
+    Block(char* data_in, uint32_t data_size, UserDataDeleter deleter)
+        : nshared(1)
+        , flags(IOBUF_BLOCK_FLAGS_USER_DATA)
+        , abi_check(0)
+        , size(data_size)
+        , cap(data_size)
+        , u({0})
+        , data(data_in) {
+        auto ext = new (get_user_data_extension()) UserDataExtension();
+        ext->deleter = std::move(deleter);
+        if (is_samplable()) {
+            SubmitIOBufSample(this, 1);
+        }
+    }
+
+    // Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
+    UserDataExtension* get_user_data_extension() {
+        char* p = (char*)this;
+        return (UserDataExtension*)(p + sizeof(Block));
+    }
+
+    inline void check_abi() {
+#ifndef NDEBUG
+    if (abi_check != 0) {
+        LOG(FATAL) << "Your program seems to wrongly contain two "
+            "ABI-incompatible implementations of IOBuf";
+    }
+#endif
+}
+
+    void inc_ref() {
+        check_abi();
+        nshared.fetch_add(1, butil::memory_order_relaxed);
+        if (sampled()) {
+            SubmitIOBufSample(this, 1);
+        }
+    }
+        
+    void dec_ref() {
+        check_abi();
+        if (sampled()) {
+            SubmitIOBufSample(this, -1);
+        }
+        if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
+            butil::atomic_thread_fence(butil::memory_order_acquire);
+            if (!is_user_data()) {
+                iobuf::dec_g_nblock();
+                iobuf::dec_g_blockmem();
+                this->~Block();
+                iobuf::blockmem_deallocate(this);
+            } else if (flags & IOBUF_BLOCK_FLAGS_USER_DATA) {
+                auto ext = get_user_data_extension();
+                ext->deleter(data);
+                ext->~UserDataExtension();
+                this->~Block();
+                free(this);
+            }
+        }
+    }
+
+    int ref_count() const {
+        return nshared.load(butil::memory_order_relaxed);
+    }
+
+    bool full() const { return size >= cap; }
+    size_t left_space() const { return cap - size; }
+
+private:
+    bool is_samplable() {
+        if (IsIOBufProfilerSamplable()) {
+            flags |= IOBUF_BLOCK_FLAGS_SAMPLED;
+            return true;
+        }
+        return false;
+    }
+
+    bool sampled() const {
+        return flags & IOBUF_BLOCK_FLAGS_SAMPLED;
+    }
+
+    bool is_user_data() const {
+        return flags & IOBUF_BLOCK_FLAGS_USER_DATA;
+    }
+};
+
+namespace iobuf {
+struct TLSData {
+    // Head of the TLS block chain.
+    IOBuf::Block* block_head;
+    
+    // Number of TLS blocks
+    int num_blocks;
+    
+    // True if the remote_tls_block_chain is registered to the thread.
+    bool registered;
+};
+
+// Max number of blocks in each TLS. This is a soft limit namely
+// release_tls_block_chain() may exceed this limit sometimes.
+const int MAX_BLOCKS_PER_THREAD = 8;
+
+inline int max_blocks_per_thread() {
+    // If IOBufProfiler is enabled, do not cache blocks in TLS.
+    return IsIOBufProfilerEnabled() ? 0 : MAX_BLOCKS_PER_THREAD;
+}
+
+TLSData* get_g_tls_data();
+void remove_tls_block_chain();
+
+IOBuf::Block* acquire_tls_block();
+
+// Return one block to TLS.
+inline void release_tls_block(IOBuf::Block* b) {
+    if (!b) {
+        return;
+    }
+    TLSData *tls_data = get_g_tls_data();
+    if (b->full()) {
+        b->dec_ref();
+    } else if (tls_data->num_blocks >= max_blocks_per_thread()) {
+        b->dec_ref();
+        // g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed);
+        inc_g_num_hit_tls_threshold();
+    } else {
+        b->u.portal_next = tls_data->block_head;
+        tls_data->block_head = b;
+        ++tls_data->num_blocks;
+        if (!tls_data->registered) {
+            tls_data->registered = true;
+            butil::thread_atexit(remove_tls_block_chain);
+        }
+    }
+}
+
+inline IOBuf::Block* create_block(const size_t block_size) {
+    if (block_size > 0xFFFFFFFFULL) {
+        LOG(FATAL) << "block_size=" << block_size << " is too large";
+        return NULL;
+    }
+    char* mem = (char*)iobuf::blockmem_allocate(block_size);
+    if (mem == NULL) {
+        return NULL;
+    }
+    return new (mem) IOBuf::Block(mem + sizeof(IOBuf::Block),
+                                  block_size - sizeof(IOBuf::Block));
+}
+
+inline IOBuf::Block* create_block() {
+    return create_block(IOBuf::DEFAULT_BLOCK_SIZE);
+}
+
+void* cp(void *__restrict dest, const void *__restrict src, size_t n);
+
+};  // namespace iobuf;
+
 }  // namespace butil
 
-#endif  // BUTIL_IOBUF_INL_H
+#endif  // BUTIL_IOBUF_INL_H
\ No newline at end of file
diff --git a/src/butil/single_iobuf.cpp b/src/butil/single_iobuf.cpp
new file mode 100644
index 00000000..c51e8fff
--- /dev/null
+++ b/src/butil/single_iobuf.cpp
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// SingleIOBuf - A continuous zero-copied buffer
+
+#include "butil/logging.h"                  // CHECK, LOG
+#include "butil/iobuf.h"
+#include "butil/iobuf_inl.h"
+#include "butil/single_iobuf.h"
+
+namespace butil {
+
+SingleIOBuf::SingleIOBuf()
+    : _cur_block(NULL)
+    , _block_size(0) {
+    _cur_ref.offset = 0;
+    _cur_ref.length = 0;
+    _cur_ref.block = NULL;
+}
+
+SingleIOBuf::SingleIOBuf(const IOBuf::BlockRef& ref) {
+    _cur_block = NULL;
+    _block_size = 0;
+    if (ref.block) {
+        _cur_ref = ref;
+        _cur_ref.block->inc_ref();
+    }
+}
+
+SingleIOBuf::SingleIOBuf(const SingleIOBuf& other) {
+    _cur_block = NULL;
+    _block_size = 0;
+    if (other._cur_ref.block != NULL) {
+        _cur_ref = other._cur_ref;
+        _cur_ref.block->inc_ref();
+    }
+}
+
+SingleIOBuf::~SingleIOBuf() {
+    reset();
+}
+
+SingleIOBuf& SingleIOBuf::operator=(const SingleIOBuf& rhs) {
+    reset();
+    _block_size = 0;
+    if (rhs._cur_ref.block != NULL) {
+        _cur_ref = rhs._cur_ref;
+        _cur_ref.block->inc_ref();
+    }
+    return *this;
+}
+
+void SingleIOBuf::swap(SingleIOBuf& other) {
+    if (this == &other) {
+        return;
+    }
+
+    IOBuf::BlockRef tmp_ref = _cur_ref;
+    _cur_ref = other._cur_ref;
+    other._cur_ref = tmp_ref;
+    IOBuf::Block* tmp_ptr = other._cur_block;
+    other._cur_block = _cur_block;
+    _cur_block = tmp_ptr;
+    uint32_t tmp_size = other._block_size;
+    other._block_size = _block_size;
+    _block_size = tmp_size;
+}
+
+void* SingleIOBuf::allocate(uint32_t size) {
+    IOBuf::Block* b = alloc_block_by_size(size);
+    if (!b) {
+        return NULL;
+    }
+    _cur_ref.offset = b->size;
+    _cur_ref.length = size;
+    _cur_ref.block = b;
+    b->size += size;
+    b->inc_ref();
+    return b->data + _cur_ref.offset;
+}
+
+void SingleIOBuf::deallocate(void* p) {
+    if (_cur_ref.block) {
+        if (_cur_ref.block->data + _cur_ref.offset == (char*)p) { 
+            reset();
+        }
+    }
+}
+
+IOBuf::Block* SingleIOBuf::alloc_block_by_size(uint32_t data_size) {
+    if (_cur_block != NULL) {
+        if (_cur_block->left_space() >= data_size) {
+            return _cur_block;
+        } else {
+            _cur_block->dec_ref();
+            _cur_block = NULL;
+        }
+    }
+    uint32_t total_size = data_size + sizeof(IOBuf::Block);
+    if (total_size <= IOBuf::DEFAULT_BLOCK_SIZE) {
+        _cur_block = iobuf::acquire_tls_block();
+        if (_cur_block != NULL) {
+            if (_cur_block->left_space() >= data_size) {
+                return _cur_block;
+            } else {
+                _cur_block->dec_ref();
+                _cur_block = NULL;
+            }
+        }
+        _cur_block = iobuf::create_block();
+    } else {
+        _cur_block = iobuf::create_block(total_size);
+        _block_size = total_size;
+    }
+    if (BAIDU_UNLIKELY(!_cur_block)) {
+        errno = ENOMEM;
+        _block_size = 0;
+        return NULL;
+    }
+    return _cur_block;
+}
+
+void SingleIOBuf::memcpy_downward(void* old_p, uint32_t old_size,
+                    void* new_p, uint32_t new_size,
+                    uint32_t in_use_back, uint32_t in_use_front) {
+    memcpy((u_char*)new_p + new_size - in_use_back, (u_char*)old_p + old_size 
- in_use_back,
+            in_use_back);
+    memcpy(new_p, old_p, in_use_front);
+}
+
+void* SingleIOBuf::reallocate_downward(uint32_t new_size, uint32_t in_use_back,
+                                        uint32_t in_use_front) {
+    IOBuf::BlockRef& ref = _cur_ref;
+    if (BAIDU_UNLIKELY(new_size <= ref.length)) {
+        LOG(ERROR) << "invalid new size:" << new_size;
+        errno = EINVAL;
+        return NULL;
+    }
+    if (BAIDU_UNLIKELY(ref.block == NULL)) {
+        LOG(ERROR) << "SingleIOBuf reallocate_downward failed. Block cannot be 
null!";
+        errno = EINVAL;
+        return NULL;
+    }
+    char* old_p = ref.block->data + ref.offset;
+    uint32_t old_size = ref.length;
+    IOBuf::Block* b = alloc_block_by_size(new_size);
+    if (!b) {
+        return NULL;
+    }
+    char* new_p = b->data + b->size;
+    memcpy_downward(old_p, old_size,
+                    new_p, new_size,
+                    in_use_back, in_use_front);
+    ref.block->dec_ref();
+    _cur_ref.offset = b->size;
+    _cur_ref.length = new_size;
+    _cur_ref.block = b;
+    b->size += new_size;
+    b->inc_ref();
+    return new_p;
+}
+
+const void* SingleIOBuf::get_begin() const {
+    if (_cur_ref.block) {
+        return _cur_ref.block->data + _cur_ref.offset;
+    }
+    return NULL;
+}
+
+uint32_t SingleIOBuf::get_length() const {
+    return _cur_ref.length;
+}
+
+void SingleIOBuf::reset() {
+    if (_cur_block) {
+        if (_block_size == 0) {
+            iobuf::release_tls_block(_cur_block);
+        } else {
+            _cur_block->dec_ref();
+            _block_size = 0;
+        }
+        _cur_block = NULL;
+    }
+    if (_cur_ref.block != NULL) {
+        _cur_ref.block->dec_ref();
+    }
+    _cur_ref.offset = 0;
+    _cur_ref.length = 0;
+    _cur_ref.block = NULL;
+}
+
+bool SingleIOBuf::assign(const IOBuf& buf, uint32_t msg_size) {
+    if (BAIDU_UNLIKELY(buf.length() < msg_size)) {
+        LOG(ERROR) << "Fail to dump_mult_iobuf msg_size:" << msg_size
+            << " from source iobuf of size:" << buf.length();
+        return false;
+    }
+    if (BAIDU_UNLIKELY(msg_size == 0)) {
+        return true;
+    }
+    
+    const IOBuf::BlockRef& ref = buf._front_ref();
+    if (ref.length >= msg_size) {
+        reset();
+        _cur_ref.offset = ref.offset;
+        _cur_ref.length = msg_size;
+        _cur_ref.block = ref.block;
+        _cur_ref.block->inc_ref();
+        return true;
+    } else {
+        IOBuf::Block* b = alloc_block_by_size(msg_size);
+        if (!b) {
+            return false;
+        }
+        reset();
+        char* out = b->data + b->size;
+        const size_t nref = buf.backing_block_num();
+        uint32_t last_len = msg_size;
+        for (size_t i = 0; i < nref && last_len > 0; ++i) {
+            const IOBuf::BlockRef& r = buf._ref_at(i);
+            uint32_t n = std::min(r.length, last_len);
+            iobuf::cp(out, r.block->data + r.offset, n);
+            last_len -= n;
+            out += n;
+        }
+        _cur_ref.offset = b->size;
+        _cur_ref.length = msg_size;
+        _cur_ref.block = b;
+        b->size += msg_size;
+        _cur_ref.block->inc_ref();
+        return true;
+    }
+}
+
+void SingleIOBuf::append_to(IOBuf* buf) const {
+    if (buf && _cur_ref.block) {
+        buf->_push_back_ref(_cur_ref);
+    }
+}
+
+int SingleIOBuf::assign_user_data(void* data, size_t size, 
std::function<void(void*)> deleter) {
+    if (size > 0xFFFFFFFFULL - 100) {
+        LOG(FATAL) << "data_size=" << size << " is too large";
+        return -1;
+    }
+    char* mem = (char*)malloc(sizeof(IOBuf::Block) + 
sizeof(UserDataExtension));
+    if (mem == NULL) {
+        return -1;
+    }
+    if (deleter == NULL) {
+        deleter = ::free;
+    }
+    reset();
+    IOBuf::Block* b = new (mem) IOBuf::Block((char*)data, size, deleter);
+    _cur_ref.offset = 0;
+    _cur_ref.length = b->cap;
+    _cur_ref.block = b;
+    return 0;
+}
+
+void SingleIOBuf::target_block_inc_ref(void* b) {
+    IOBuf::Block* block = (IOBuf::Block*)b;
+    block->inc_ref();
+}
+
+void SingleIOBuf::target_block_dec_ref(void* b) {
+    IOBuf::Block* block = (IOBuf::Block*)b;
+    block->dec_ref();
+}
+
+} // namespace butil
\ No newline at end of file
diff --git a/src/butil/single_iobuf.h b/src/butil/single_iobuf.h
new file mode 100644
index 00000000..9c4237aa
--- /dev/null
+++ b/src/butil/single_iobuf.h
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// SingleIOBuf - A continuous zero-copied buffer
+
+#ifndef BUTIL_SINGLE_IOBUF_H
+#define BUTIL_SINGLE_IOBUF_H
+
+#include "butil/iobuf.h"
+#include "butil/iobuf_inl.h"
+
+namespace butil {
+
+// SingleIOBuf is a lightweight buffer that manages a single IOBuf::Block.
+// It always ensures that the underlying memory is contiguous and 
+// avoids unnecessary memory copies.
+// It is primarily used to efficiently serialize and deserialize 
+// RPC requests in flatbuffers.
+class SingleIOBuf {
+public:
+    SingleIOBuf();
+    ~SingleIOBuf();
+    SingleIOBuf(const IOBuf::BlockRef& ref);
+    SingleIOBuf(const SingleIOBuf& other);
+    SingleIOBuf& operator=(const SingleIOBuf& rhs);
+    void swap(SingleIOBuf& other);
+
+    // Allocates a contiguous memory region of the specified size.
+    // Returns a pointer to the start of allocated space within the block.
+    void* allocate(uint32_t size);
+    // Deallocates the block if the given pointer matches its starting address.
+    void deallocate(void* p);
+
+    // Reallocates the current buffer to a larger size.
+    // in_use_back indicates the memory used by message data.
+    // in_use_front indicates the memory used by metadata.
+    void* reallocate_downward(uint32_t new_size,
+                            uint32_t in_use_back,
+                            uint32_t in_use_front);
+
+    // Returns a pointer to the beginning of the current buffer data.
+    const void* get_begin() const;
+    
+    // Get the length of the SingleIOBuf.
+    uint32_t get_length() const;
+    
+    // Reset the SingleIOBuf by release the block and clear the BlockRef.
+    void reset();
+
+    // Assigns data from the given IOBuf to this SingleIOBuf.
+    // If the source contains multiple BlockRef segments, 
+    // they will be concatenated into a single contiguous block.
+    bool assign(const IOBuf& buf, uint32_t msg_size);
+    
+    // Appends the current block of the SingleIOBuf to the given IOBuf.
+    void append_to(IOBuf* buf) const;
+
+    const IOBuf::BlockRef& get_cur_ref() const { return _cur_ref; }
+    void* get_cur_block() { return (void*)_cur_ref.block; }
+
+    // Returns the number of underlying blocks in the SingleIOBuf,
+    // which is either 1 if a block exists or 0 if none.
+    size_t backing_block_num() const { return _cur_ref.block != NULL ? 1 : 0; }
+    
+    // Assigns user date to the SingleIOBuf,
+    // updates _cur_ref to point to the block storing the data.
+    int assign_user_data(void* data, size_t size, std::function<void(void*)> 
deleter);
+    
+    // Increments the reference count of the specified target block.
+    static void target_block_inc_ref(void* b);  
+    // Decrements the reference count of the specified target block.
+    static void target_block_dec_ref(void* b);
+
+protected:
+    // Copy from the old buffer to the corresponding positions in the new 
block.
+    void memcpy_downward(void* old_p, uint32_t old_size,
+                        void* new_p, uint32_t new_size,
+                        uint32_t in_use_back, uint32_t in_use_front);
+
+    // Allocates and returns a memory block large enough to hold the specified 
data size, 
+    // reusing an existing block when possible or creating a new one if 
necessary.
+    IOBuf::Block* alloc_block_by_size(uint32_t data_size);
+    
+private:
+    // Current block the SingleIOBuf used to allocate memory.
+    IOBuf::Block* _cur_block;
+    // Current block total size, include sizeof(IOBuf::Block).
+    uint32_t _block_size;
+    // Point to the block storing the data.
+    IOBuf::BlockRef _cur_ref;
+};
+
+}  // namespace butil
+
+#endif
\ No newline at end of file
diff --git a/test/iobuf_unittest.cpp b/test/iobuf_unittest.cpp
index a919cbda..679cdfe7 100644
--- a/test/iobuf_unittest.cpp
+++ b/test/iobuf_unittest.cpp
@@ -29,6 +29,7 @@
 #include <butil/time.h>                 // Timer
 #include <butil/fd_utility.h>           // make_non_blocking
 #include <butil/iobuf.h>
+#include <butil/single_iobuf.h>
 #include <butil/logging.h>
 #include <butil/fd_guard.h>
 #include <butil/errno.h>
@@ -1890,4 +1891,59 @@ TEST_F(IOBufTest, reserve_aligned) {
     }
 }
 
+TEST_F(IOBufTest, single_iobuf) {
+    butil::IOBuf buf1;
+    // It will be freed by IOBuf.
+    char *usr_str = (char *)malloc(16);
+    memset(usr_str, 0, 16);
+    char src_str[] = "abcdefgh12345678";
+    size_t total_len = sizeof(src_str);
+    strncpy(usr_str, src_str + 8, total_len - 8);
+    buf1.append(src_str, 8);
+    buf1.append_user_data(usr_str, total_len - 8, NULL);
+    ASSERT_EQ(2, buf1.backing_block_num());
+    butil::SingleIOBuf sbuf;
+    ASSERT_EQ(0, sbuf.backing_block_num());
+    sbuf.assign(buf1, total_len);
+    ASSERT_EQ(1, sbuf.backing_block_num());
+    size_t s_len = sbuf.get_length();
+    ASSERT_EQ(s_len, total_len);
+    const char* str = (const char*) sbuf.get_begin();
+    int ret = strcmp(str, src_str);
+    ASSERT_EQ(0, ret);
+    butil::IOBuf buf2;
+    sbuf.append_to(&buf2);
+    ASSERT_EQ(buf2.length(), total_len);
+    butil::SingleIOBuf sbuf2;
+    sbuf2.swap(sbuf);
+    ASSERT_EQ(sbuf.get_length(), 0);
+    ASSERT_EQ(sbuf2.get_length(), total_len);
+    sbuf2.reset();
+    ASSERT_EQ(0, sbuf2.get_length());
+    
+    void* buf = sbuf.allocate(1024);
+    ASSERT_TRUE(NULL != buf);
+    buf = sbuf.reallocate_downward(16384, 0, 0);
+    ASSERT_TRUE(NULL != buf);
+    s_len = sbuf.get_length();
+    ASSERT_EQ(16384, s_len);
+
+    butil::IOBuf::BlockRef ref = sbuf.get_cur_ref();
+    butil::SingleIOBuf sbuf3(ref);
+    s_len = sbuf3.get_length();
+    ASSERT_EQ(16384, s_len);
+    sbuf.deallocate(buf);
+
+    errno = 0;
+    void *null_buf = sbuf3.reallocate_downward(s_len - 1, 0, 0);
+    ASSERT_EQ(null_buf, nullptr);
+
+    uint32_t old_size = sbuf3.get_length();
+    void *p = sbuf3.reallocate_downward(old_size + 16, 0, old_size); 
+    ASSERT_TRUE(p != nullptr);
+    old_size = sbuf3.get_length();
+    p = sbuf3.reallocate_downward(old_size + 16, old_size, 0);
+    ASSERT_TRUE(p != nullptr);
+}
+
 } // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to