This is an automated email from the ASF dual-hosted git repository.

gfphoenix78 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git


The following commit(s) were added to refs/heads/main by this push:
     new 12b43e38d19 PAX: optimize io read for multiple discrete columns in a 
group
12b43e38d19 is described below

commit 12b43e38d1972aefa7fab2a88560c970ffe7e0c3
Author: Hao Wu <[email protected]>
AuthorDate: Thu Nov 27 15:24:49 2025 +0000

    PAX: optimize io read for multiple discrete columns in a group
    
    When reading multiple discrete columns in a group, the code
    reads columnar data block by block in synchronous mode. It
    means that all I/O requests on the columnar data are completed
    in a serialized manner, which is low efficient.
    
    This commit uses iouring to submit a batch of IO request to allow
    OS optimizes IO in parallel for better throughput.
    
    libaio is another candidate. But it doesn't bring improvement
    in our benchmark test(without O_DIRECT).
---
 contrib/pax_storage/doc/README.md                  |   1 +
 contrib/pax_storage/src/cpp/cmake/pax.cmake        |   9 +-
 contrib/pax_storage/src/cpp/cmake/pax_format.cmake |   7 +-
 contrib/pax_storage/src/cpp/comm/common_io.h       |  39 ++++++
 contrib/pax_storage/src/cpp/comm/fast_io.cc        | 134 +++++++++++++++++++++
 contrib/pax_storage/src/cpp/comm/fast_io.h         |  88 ++++++++++++++
 contrib/pax_storage/src/cpp/storage/file_system.cc |  10 ++
 contrib/pax_storage/src/cpp/storage/file_system.h  |   2 +
 .../src/cpp/storage/local_file_system.cc           |  22 ++++
 .../src/cpp/storage/orc/orc_format_reader.cc       |  10 +-
 10 files changed, 314 insertions(+), 8 deletions(-)

diff --git a/contrib/pax_storage/doc/README.md 
b/contrib/pax_storage/doc/README.md
index 39077955a0c..43393f2f859 100644
--- a/contrib/pax_storage/doc/README.md
+++ b/contrib/pax_storage/doc/README.md
@@ -46,6 +46,7 @@ PAX will be built with `--enable-pax` when you build the 
Cloudberry. Dependency
 - **CMake**: 3.11 or later
 - **Protobuf**: 3.5.0 or later
 - **ZSTD (libzstd)**: 1.4.0 or later
+- **liburing**: 2.12 or later
 
 Also, you need to run the following command at the top level of the Cloudberry 
source code directory to download the submodules:
 
diff --git a/contrib/pax_storage/src/cpp/cmake/pax.cmake 
b/contrib/pax_storage/src/cpp/cmake/pax.cmake
index 099a66f30d8..528b4e8cafc 100644
--- a/contrib/pax_storage/src/cpp/cmake/pax.cmake
+++ b/contrib/pax_storage/src/cpp/cmake/pax.cmake
@@ -30,6 +30,7 @@ set(pax_comm_src
     comm/bitmap.cc
     comm/bloomfilter.cc
     comm/byte_buffer.cc
+    comm/fast_io.cc
     comm/guc.cc
     comm/paxc_wrappers.cc
     comm/pax_memory.cc
@@ -173,7 +174,7 @@ add_subdirectory(contrib/tabulate)
 set(pax_target_src  ${PROTO_SRCS} ${pax_storage_src} ${pax_clustering_src} 
${pax_exceptions_src}
   ${pax_access_src} ${pax_comm_src} ${pax_catalog_src} ${pax_vec_src})
 set(pax_target_include ${pax_target_include} ${ZTSD_HEADER} 
${CMAKE_CURRENT_SOURCE_DIR} ${CBDB_INCLUDE_DIR} contrib/tabulate/include)
-set(pax_target_link_libs ${pax_target_link_libs} protobuf zstd z postgres)
+set(pax_target_link_libs ${pax_target_link_libs} protobuf zstd z postgres 
uring)
 if (PAX_USE_LZ4)
   list(APPEND pax_target_link_libs lz4)
 endif()
@@ -207,7 +208,7 @@ endif(VEC_BUILD)
 
 target_include_directories(pax PUBLIC ${pax_target_include})
 target_link_directories(pax PUBLIC ${pax_target_link_directories})
-target_link_libraries(pax PUBLIC ${pax_target_link_libs})
+target_link_libraries(pax PRIVATE ${pax_target_link_libs})
 set_target_properties(pax PROPERTIES
   BUILD_RPATH_USE_ORIGIN ON
   BUILD_WITH_INSTALL_RPATH ON
@@ -233,8 +234,8 @@ if (BUILD_GTEST)
   add_dependencies(test_main ${pax_target_dependencies} gtest gmock)
   target_include_directories(test_main PUBLIC ${pax_target_include} 
${CMAKE_CURRENT_SOURCE_DIR} ${gtest_SOURCE_DIR}/include contrib/cpp-stub/src/ 
contrib/cpp-stub/src_linux/)
 
-  target_link_directories(test_main PUBLIC ${pax_target_link_directories})
-  target_link_libraries(test_main PUBLIC ${pax_target_link_libs} gtest gmock 
postgres)
+  target_link_directories(test_main PRIVATE ${pax_target_link_directories})
+  target_link_libraries(test_main PRIVATE ${pax_target_link_libs} gtest gmock 
postgres)
 endif(BUILD_GTEST)
 
 if(BUILD_GBENCH)
diff --git a/contrib/pax_storage/src/cpp/cmake/pax_format.cmake 
b/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
index 5a12185a0e6..8d28e793d27 100644
--- a/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
+++ b/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
@@ -20,6 +20,7 @@ set(pax_comm_src
     comm/bitmap.cc
     comm/bloomfilter.cc
     comm/byte_buffer.cc
+    comm/fast_io.cc
     comm/guc.cc
     comm/paxc_wrappers.cc
     comm/pax_memory.cc
@@ -108,7 +109,7 @@ set(pax_vec_src ${pax_vec_src}
 endif()
 
 set(pax_target_include ${ZTSD_HEADER} ${CMAKE_CURRENT_SOURCE_DIR} 
${CBDB_INCLUDE_DIR} contrib/tabulate/include)
-set(pax_target_link_libs uuid protobuf zstd z)
+set(pax_target_link_libs uuid protobuf zstd z uring)
 if (PAX_USE_LZ4)
   list(APPEND pax_target_link_libs lz4)
 endif()
@@ -135,7 +136,7 @@ endif(VEC_BUILD)
 add_library(paxformat SHARED ${PROTO_SRCS} ${pax_storage_src} 
${pax_clustering_src} ${pax_exceptions_src} ${pax_comm_src} ${pax_vec_src})
 target_include_directories(paxformat PUBLIC ${pax_target_include})
 target_link_directories(paxformat PUBLIC ${pax_target_link_directories})
-target_link_libraries(paxformat PUBLIC ${pax_target_link_libs})  
+target_link_libraries(paxformat PRIVATE ${pax_target_link_libs})
    
 set_target_properties(paxformat PROPERTIES
   OUTPUT_NAME paxformat)
@@ -196,4 +197,4 @@ install(TARGETS paxformat
 add_executable(paxformat_test paxformat_test.cc)
 target_include_directories(paxformat_test PUBLIC ${pax_target_include} 
${CMAKE_CURRENT_SOURCE_DIR})
 add_dependencies(paxformat_test paxformat)
-target_link_libraries(paxformat_test PUBLIC paxformat postgres)
+target_link_libraries(paxformat_test PRIVATE paxformat postgres)
diff --git a/contrib/pax_storage/src/cpp/comm/common_io.h 
b/contrib/pax_storage/src/cpp/comm/common_io.h
new file mode 100644
index 00000000000..44730376054
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/comm/common_io.h
@@ -0,0 +1,39 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * common_io.h
+ *
+ * IDENTIFICATION
+ *       contrib/pax_storage/src/cpp/comm/common_io.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#pragma once
+#include <cstddef>
+#include <fcntl.h>
+
+namespace pax
+{
+struct IORequest {
+  void* buffer;
+  size_t size;
+  off_t offset;
+};
+} // namespace pax
diff --git a/contrib/pax_storage/src/cpp/comm/fast_io.cc 
b/contrib/pax_storage/src/cpp/comm/fast_io.cc
new file mode 100644
index 00000000000..5b9e593def9
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/comm/fast_io.cc
@@ -0,0 +1,134 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * fast_io.cc
+ *
+ * IDENTIFICATION
+ *       contrib/pax_storage/src/cpp/comm/fast_io.cc
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "fast_io.h"
+
+namespace pax
+{
+
+bool IOUringFastIO::available() {
+  static char support_io_uring = 0;
+
+  if (support_io_uring == 1) return true;
+  if (support_io_uring == -1) return false;
+
+  struct io_uring ring;
+  bool supported = (io_uring_queue_init(128, &ring, 0) == 0);
+  if (supported) {
+    io_uring_queue_exit(&ring);
+  }
+  support_io_uring = supported ? 1 : -1;
+  return supported;
+}
+
+// if pair.first == 0, all read requests are successful
+// pair.second indicates the number of successful read requests
+std::pair<int, int> IOUringFastIO::read(int fd, std::vector<IORequest> 
&request, std::vector<bool> &result)  {
+  size_t index = 0;
+  int success_read = 0;
+  int retcode = 0;
+  size_t completed = 0;
+  size_t total_requests = request.size();
+
+  // Implementation for synchronous read using io_uring
+  if (uring_likely(request.empty())) return {0, 0};
+  if (status_ != 'i') return {-EINVAL, 0};
+
+  result.resize(request.size(), false);
+
+  while (completed < total_requests) {
+    struct io_uring_sqe *sqe;
+    struct io_uring_cqe *cqe;
+    unsigned head;
+    unsigned count;
+    int rc;
+    // Submit read requests
+    while (index < total_requests) {
+      sqe = io_uring_get_sqe(&ring_);
+      if (!sqe) break; // No more SQEs available, retry later
+
+      io_uring_prep_read(sqe, fd, request[index].buffer, request[index].size, 
request[index].offset);
+      io_uring_sqe_set_data(sqe, (void*)(uintptr_t)index);
+      index++;
+    }
+
+    // submit and wait for completions
+    do {
+      rc = io_uring_submit_and_wait(&ring_, 1);
+    } while (rc == -EINTR);
+    if (rc < 0) return {rc, success_read};
+
+    count = 0;
+    io_uring_for_each_cqe(&ring_, head, cqe) {
+      size_t req_index = (size_t)(uintptr_t)io_uring_cqe_get_data(cqe);
+      if (cqe->res >= 0) {
+        // Successful read
+        result[req_index] = true;
+        success_read++;
+      } else if (retcode == 0) {
+        retcode = cqe->res; // capture the first error
+      }
+      completed++;
+      count++;
+    }
+    io_uring_cq_advance(&ring_, count);
+  }
+  return {retcode, success_read}; // Placeholder
+}
+
+std::pair<int, int> SyncFastIO::read(int fd, std::vector<IORequest> &request, 
std::vector<bool> &result) {
+  size_t total_requests = request.size();
+  if (total_requests == 0) return {0, 0};
+
+  result.resize(total_requests, false);
+
+  int success_read = 0;
+  int retcode = 0;
+
+  for (size_t i = 0; i < total_requests; ++i) {
+    ssize_t bytes_read = 0;
+    ssize_t nbytes;
+    auto &req = request[i];
+    do {
+      nbytes = pread(fd, (char *)req.buffer + bytes_read, req.size - 
bytes_read, req.offset + bytes_read);
+      if (nbytes > 0) bytes_read += nbytes;
+    } while ((nbytes == -1 && errno == EINTR) || (nbytes > 0 && 
static_cast<size_t>(bytes_read) < req.size));
+
+    if (bytes_read < 0) {
+      if (retcode == 0) {
+          retcode = static_cast<int>(bytes_read); // capture first error
+      }
+    } else if (static_cast<size_t>(bytes_read) == request[i].size) {
+      result[i] = true;
+      success_read++;
+    }
+  }
+
+  return {retcode, success_read};
+}
+
+} // namespace pax
\ No newline at end of file
diff --git a/contrib/pax_storage/src/cpp/comm/fast_io.h 
b/contrib/pax_storage/src/cpp/comm/fast_io.h
new file mode 100644
index 00000000000..da63b4d89ea
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/comm/fast_io.h
@@ -0,0 +1,88 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * fast_io.h
+ *
+ * IDENTIFICATION
+ *       contrib/pax_storage/src/cpp/comm/fast_io.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#pragma once
+
+#include "comm/common_io.h"
+
+#include <liburing.h>
+#include <cstddef>
+#include <cstdio>
+#include <algorithm>
+#include <vector>
+
+namespace pax
+{
+
+template<typename T>
+int fast_io_read(int fd, std::vector<IORequest> &request) {
+  T io_handler(request.size());
+  return io_handler.read(fd, request).first;
+}
+
+template<typename T>
+std::pair<int, int> fast_io_read2(int fd, std::vector<IORequest> &request) {
+  T io_handler(request.size());
+  return io_handler.read(fd, request);
+}
+
+class SyncFastIO {
+public:
+  SyncFastIO(size_t dummy_queue_size = 0) {}
+  std::pair<int, int> read(int fd, std::vector<IORequest> &request, 
std::vector<bool> &result);
+};
+
+// io_uring-based FastIO
+class IOUringFastIO {
+public:
+  IOUringFastIO(size_t queue_size = 128) {
+    int ret = io_uring_queue_init(std::max(queue_size, 
static_cast<size_t>(128)), &ring_, 0);
+
+    // ret < 0: unsupported
+    // otherwise initialized
+    status_ = ret < 0 ? 'x' : 'i';
+  }
+
+  ~IOUringFastIO() {
+    if (status_ == 'i')
+      io_uring_queue_exit(&ring_);
+  }
+
+  static bool available();
+
+  // if pair.first == 0, all read requests are successful
+  // pair.second indicates the number of successful read requests
+  std::pair<int, int> read(int fd, std::vector<IORequest> &request, 
std::vector<bool> &result);
+
+private:
+  struct io_uring ring_;
+
+  // 'u' for uninitialized, 'i' for initialized, 'x' for unsupported
+  char status_ = 'u';
+};
+
+} // namespace pax
\ No newline at end of file
diff --git a/contrib/pax_storage/src/cpp/storage/file_system.cc 
b/contrib/pax_storage/src/cpp/storage/file_system.cc
index fa0667241e8..2ac4803ba8c 100644
--- a/contrib/pax_storage/src/cpp/storage/file_system.cc
+++ b/contrib/pax_storage/src/cpp/storage/file_system.cc
@@ -65,4 +65,14 @@ void File::PWriteN(const void *buf, size_t count, off64_t 
offset) {
                  "errno=%d], %s",
                  offset, count, num, errno, DebugString().c_str()));
 }
+
+void File::ReadBatch(const std::vector<IORequest> &requests) const {
+  if (requests.empty()) {
+    return;
+  }
+  for (const auto &req : requests) {
+    PReadN(req.buffer, req.size, req.offset);
+  }
+}
+
 }  // namespace pax
diff --git a/contrib/pax_storage/src/cpp/storage/file_system.h 
b/contrib/pax_storage/src/cpp/storage/file_system.h
index ca1af8877cc..6569ee3b858 100644
--- a/contrib/pax_storage/src/cpp/storage/file_system.h
+++ b/contrib/pax_storage/src/cpp/storage/file_system.h
@@ -33,6 +33,7 @@
 #include <string>
 #include <vector>
 
+#include "comm/common_io.h"
 #include "comm/pax_memory.h"
 
 namespace pax {
@@ -74,6 +75,7 @@ class File {
   virtual void WriteN(const void *ptr, size_t n);
   virtual void PWriteN(const void *buf, size_t count, off_t offset);
   virtual void PReadN(void *buf, size_t count, off_t offset) const;
+  virtual void ReadBatch(const std::vector<IORequest> &requests) const;
 
   virtual void Flush() = 0;
   virtual void Delete() = 0;
diff --git a/contrib/pax_storage/src/cpp/storage/local_file_system.cc 
b/contrib/pax_storage/src/cpp/storage/local_file_system.cc
index 1c71eceb7a9..82afafbfcfd 100644
--- a/contrib/pax_storage/src/cpp/storage/local_file_system.cc
+++ b/contrib/pax_storage/src/cpp/storage/local_file_system.cc
@@ -35,6 +35,7 @@
 
 #include "access/pax_access_handle.h"
 #include "comm/cbdb_wrappers.h"
+#include "comm/fast_io.h"
 #include "comm/fmt.h"
 #include "comm/pax_memory.h"
 #include "comm/pax_resource.h"
@@ -51,6 +52,7 @@ class LocalFile final : public File {
   ssize_t Write(const void *ptr, size_t n) override;
   ssize_t PWrite(const void *ptr, size_t n, off_t offset) override;
   ssize_t PRead(void *ptr, size_t n, off_t offset) const override;
+  void ReadBatch(const std::vector<IORequest> &requests) const override;
   size_t FileLength() const override;
   void Flush() override;
   void Delete() override;
@@ -132,6 +134,26 @@ ssize_t LocalFile::PWrite(const void *ptr, size_t n, off_t 
offset) {
   return num;
 }
 
+void LocalFile::ReadBatch(const std::vector<IORequest> &requests) const {
+  if (unlikely(requests.empty())) return;
+
+  if (IOUringFastIO::available()) {
+    IOUringFastIO fast_io(requests.size());
+    std::vector<bool> result(requests.size(), false);
+    auto res = fast_io.read(fd_, 
const_cast<std::vector<IORequest>&>(requests), result);
+    CBDB_CHECK(res.first == 0, cbdb::CException::ExType::kExTypeIOError,
+               fmt("Fail to ReadBatch with io_uring [successful=%d, 
total=%lu], %s",
+                   res.second, requests.size(), DebugString().c_str()));
+  } else {
+    SyncFastIO fast_io;
+    std::vector<bool> result(requests.size(), false);
+    auto res = fast_io.read(fd_, 
const_cast<std::vector<IORequest>&>(requests), result);
+    CBDB_CHECK(res.first == 0, cbdb::CException::ExType::kExTypeIOError,
+               fmt("Fail to ReadBatch with sync read [successful=%d, 
total=%lu], %s",
+                   res.second, requests.size(), DebugString().c_str()));
+  }
+}
+
 size_t LocalFile::FileLength() const {
   struct stat file_stat {};
   int rc;
diff --git a/contrib/pax_storage/src/cpp/storage/orc/orc_format_reader.cc 
b/contrib/pax_storage/src/cpp/storage/orc/orc_format_reader.cc
index 9e939305c61..ad1736f4f08 100644
--- a/contrib/pax_storage/src/cpp/storage/orc/orc_format_reader.cc
+++ b/contrib/pax_storage/src/cpp/storage/orc/orc_format_reader.cc
@@ -327,6 +327,7 @@ pax::porc::proto::StripeFooter 
OrcFormatReader::ReadStripeWithProjection(
 
   batch_offset = stripe_footer_offset;
 
+  std::vector<IORequest> io_requests;
   while (index < column_types_.size()) {
     // Current column have been skipped
     // Move `batch_offset` and `streams_index` to the right position
@@ -398,10 +399,17 @@ pax::porc::proto::StripeFooter 
OrcFormatReader::ReadStripeWithProjection(
       continue;
     }
 
-    file_->PReadN(data_buffer->GetAvailableBuffer(), batch_len, batch_offset);
+    {
+      IORequest io_request;
+      io_request.offset = batch_offset;
+      io_request.size = batch_len;
+      io_request.buffer = data_buffer->GetAvailableBuffer();
+      io_requests.emplace_back(io_request);
+    }
     data_buffer->Brush(batch_len);
     batch_offset += batch_len;
   }
+  file_->ReadBatch(io_requests);
 
   return stripe_footer;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to