This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 3eb40413 iobuf support reserve_aligned (#2942)
3eb40413 is described below
commit 3eb40413b2e3b4747a101dae5d7dcaba9b48a324
Author: Yang,Liming <[email protected]>
AuthorDate: Thu Apr 24 15:37:26 2025 +0800
iobuf support reserve_aligned (#2942)
---
src/butil/iobuf.cpp | 56 ++++++++++++++++++++++++++
src/butil/iobuf.h | 11 +++++
test/iobuf_unittest.cpp | 105 ++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 172 insertions(+)
diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp
index 8895fb16..59ad61a2 100644
--- a/src/butil/iobuf.cpp
+++ b/src/butil/iobuf.cpp
@@ -355,6 +355,23 @@ inline IOBuf::Block* create_block() {
return create_block(IOBuf::DEFAULT_BLOCK_SIZE);
}
+inline IOBuf::Block* create_block_aligned(size_t block_size, size_t alignment)
{
+ if (block_size > 0xFFFFFFFFULL) {
+ LOG(FATAL) << "block_size=" << block_size << " is too large";
+ return NULL;
+ }
+ char* mem = (char*)iobuf::blockmem_allocate(block_size);
+ if (mem == NULL) {
+ return NULL;
+ }
+ char* data = mem + sizeof(IOBuf::Block);
+ // change data pointer & data size make align satisfied
+ size_t adder = (-reinterpret_cast<uintptr_t>(data)) & (alignment - 1);
+ size_t size =
+ (block_size - sizeof(IOBuf::Block) - adder) & ~(alignment - 1);
+ return new (mem) IOBuf::Block(data + adder, size);
+}
+
// === Share TLS blocks between appending operations ===
// Max number of blocks in each TLS. This is a soft limit namely
// release_tls_block_chain() may exceed this limit sometimes.
@@ -1785,6 +1802,45 @@ void IOPortal::return_cached_blocks_impl(Block* b) {
iobuf::release_tls_block_chain(b);
}
+IOBuf::Area IOReserveAlignedBuf::reserve(size_t count) {
+ IOBuf::Area result = INVALID_AREA;
+ if (_reserved == true) {
+ LOG(ERROR) << "Already call reserved";
+ return result;
+ }
+ _reserved = true;
+ bool is_power_two = _alignment > 0 && (_alignment & (_alignment - 1));
+ if (is_power_two != 0) {
+ LOG(ERROR) << "Invalid alignment, must power of two";
+ return INVALID_AREA;
+ }
+ count = (count + _alignment - 1) & ~(_alignment - 1);
+ size_t total_nc = 0;
+ while (total_nc < count) {
+ const auto block_size =
+ std::max(_alignment, 4096UL) * 2 + sizeof(IOBuf::Block);
+ auto b = iobuf::create_block_aligned(block_size, _alignment);
+ if (BAIDU_UNLIKELY(!b)) {
+ LOG(ERROR) << "Create block failed";
+ return result;
+ }
+ const size_t nc = std::min(count - total_nc, b->left_space());
+ const IOBuf::BlockRef r = {(uint32_t)b->size, (uint32_t)nc, b};
+ _push_back_ref(r);
+ // aligned block is not from tls, release block ref
+ b->dec_ref();
+ if (total_nc == 0) {
+ // Encode the area at first time. Notice that the pushed ref may
+ // be merged with existing ones.
+ result = make_area(_ref_num() - 1, _back_ref().length - nc, count);
+ }
+ // add total nc
+ total_nc += nc;
+ b->size += nc;
+ };
+ return result;
+}
+
//////////////// IOBufCutter ////////////////
IOBufCutter::IOBufCutter(butil::IOBuf* buf)
diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h
index 978f9758..3682f61e 100644
--- a/src/butil/iobuf.h
+++ b/src/butil/iobuf.h
@@ -489,6 +489,17 @@ private:
Block* _block;
};
+class IOReserveAlignedBuf : public IOBuf {
+public:
+ IOReserveAlignedBuf(size_t alignment)
+ : _alignment(alignment), _reserved(false) {}
+ Area reserve(size_t count);
+
+private:
+ size_t _alignment;
+ bool _reserved;
+};
+
// Specialized utility to cut from IOBuf faster than using corresponding
// methods in IOBuf.
// Designed for efficiently parsing data from IOBuf.
diff --git a/test/iobuf_unittest.cpp b/test/iobuf_unittest.cpp
index b45a174c..a919cbda 100644
--- a/test/iobuf_unittest.cpp
+++ b/test/iobuf_unittest.cpp
@@ -22,6 +22,7 @@
#include <fcntl.h> // O_RDONLY
#include <stdlib.h>
#include <memory>
+#include <cstring>
#include <butil/files/temp_file.h> // TempFile
#include <butil/containers/flat_map.h>
#include <butil/macros.h>
@@ -1785,4 +1786,108 @@ TEST_F(IOBufTest, acquire_tls_block) {
ASSERT_NE(butil::iobuf::block_cap(b), butil::iobuf::block_size(b));
}
+TEST_F(IOBufTest, reserve_aligned) {
+ {
+ butil::IOReserveAlignedBuf buf(16);
+ auto area = buf.reserve(1024);
+ ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
+ butil::IOBufAsZeroCopyInputStream wrapper(buf);
+ const void* data;
+ int size;
+ int total_size = 0;
+ while (wrapper.Next(&data, &size)) {
+ ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 16, 0);
+ ASSERT_EQ(size % 16, 0);
+ total_size += size;
+ }
+ ASSERT_EQ(total_size, 1024);
+ }
+ {
+ butil::IOReserveAlignedBuf buf(4096);
+ auto area = buf.reserve(1024);
+ ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
+ butil::IOBufAsZeroCopyInputStream wrapper(buf);
+ const void* data;
+ int size;
+ int total_size = 0;
+ while (wrapper.Next(&data, &size)) {
+ ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0);
+ ASSERT_EQ(size % 4096, 0);
+ total_size += size;
+ }
+ ASSERT_EQ(total_size, 4096);
+ }
+ {
+ butil::IOReserveAlignedBuf buf(4096);
+ auto area = buf.reserve(8191);
+ ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
+ butil::IOBufAsZeroCopyInputStream wrapper(buf);
+ const void* data;
+ int size;
+ int total_size = 0;
+ while (wrapper.Next(&data, &size)) {
+ ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0);
+ ASSERT_EQ(size % 4096, 0);
+ total_size += size;
+ }
+ ASSERT_EQ(total_size, 8192);
+ }
+ {
+ butil::IOReserveAlignedBuf buf(4096);
+ auto area = buf.reserve(4096 * 10 - 1);
+ ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
+ butil::IOBufAsZeroCopyInputStream wrapper(buf);
+ const void* data;
+ int size;
+ int total_size = 0;
+ while (wrapper.Next(&data, &size)) {
+ ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0);
+ ASSERT_EQ(size % 4096, 0);
+ total_size += size;
+ }
+ ASSERT_EQ(total_size, 4096 * 10);
+ }
+ {
+ butil::IOReserveAlignedBuf buf(4095);
+ auto area = buf.reserve(4096);
+ ASSERT_EQ(area, butil::IOBuf::INVALID_AREA);
+ }
+ {
+ butil::IOReserveAlignedBuf buf(8192);
+ auto area = buf.reserve(4096 * 10 + 1);
+ ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
+ butil::IOBufAsZeroCopyInputStream wrapper(buf);
+ const void* data;
+ int size;
+ int total_size = 0;
+ while (wrapper.Next(&data, &size)) {
+ ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0);
+ ASSERT_EQ(size % 4096, 0);
+ total_size += size;
+ }
+ ASSERT_EQ(total_size, 4096 * 10 + 8192);
+ }
+ {
+ butil::IOReserveAlignedBuf buf(4096);
+ auto area = buf.reserve(1024 * 1024 * 3);
+ ASSERT_NE(area, butil::IOBuf::INVALID_AREA);
+ butil::IOBufAsZeroCopyInputStream wrapper(buf);
+ const void* data;
+ int size;
+ int count = 0;
+ int total_size = 0;
+ std::stringstream ss;
+ while (wrapper.Next(&data, &size)) {
+ ASSERT_EQ(reinterpret_cast<uintptr_t>(data) % 4096, 0);
+ ASSERT_EQ(size % 4096, 0);
+ std::string str(size, 'A' + count++);
+ ss << str;
+ std::memcpy(const_cast<void*>(data), str.data(), str.size());
+ total_size += size;
+ }
+ ASSERT_EQ(total_size, 3145728);
+ ASSERT_EQ(ss.str(), buf.to_string());
+ }
+}
+
} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]