This is an automated email from the ASF dual-hosted git repository.

asukaminato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bfd360db feat(bindings/cpp): cpp async op && reader, lister (#6228)
5bfd360db is described below

commit 5bfd360db9adbd0d6f33b62ad295c0aa1cba8e0e
Author: Asuka Minato <[email protected]>
AuthorDate: Mon Jun 16 19:47:57 2025 +0900

    feat(bindings/cpp): cpp async op && reader, lister (#6228)
    
    * add list support
    
    * add more async op
    
    * try add tests
    
    * fmt
    
    * try fix
    
    * use co_await
    
    * see whether other works
    
    * skip copy test
    
    * try fix
    
    * try ?
    
    * pass the test
    
    * rm
    
    * async reader / lister
    
    * fix some comment
    
    * not in Reader &Reader::operator=(Reader &&other).
    
    * try add valgrind for async
    
    * fix conflict
    
    * rm valgrind
---
 .github/workflows/ci_bindings_cpp.yml  |  11 +-
 bindings/cpp/Cargo.toml                |   1 +
 bindings/cpp/README.md                 |   1 +
 bindings/cpp/include/async_defs.hpp    |   5 +
 bindings/cpp/include/opendal_async.hpp | 103 +++++++++++
 bindings/cpp/src/async.rs              | 242 +++++++++++++++++++++++++
 bindings/cpp/src/opendal_async.cpp     | 107 +++++++++++
 bindings/cpp/tests/async_test.cpp      | 315 +++++++++++++++++++++++++++++++++
 8 files changed, 784 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci_bindings_cpp.yml 
b/.github/workflows/ci_bindings_cpp.yml
index 709297c4b..4765e1de9 100644
--- a/.github/workflows/ci_bindings_cpp.yml
+++ b/.github/workflows/ci_bindings_cpp.yml
@@ -84,7 +84,16 @@ jobs:
           cmake -GNinja -DOPENDAL_DEV=ON -DOPENDAL_ENABLE_ASYNC=ON 
-DCMAKE_CXX_COMPILER=clang++-18 ..
           ninja
           ./opendal_cpp_test
-      
+
+      - name: test async with valgrind
+        working-directory: "bindings/cpp"
+        run: |
+          mkdir build-valgrind-async
+          cd build-valgrind-async
+          cmake -GNinja -DOPENDAL_ENABLE_ASYNC=ON -DOPENDAL_ENABLE_TESTING=ON 
-DCMAKE_CXX_COMPILER=clang++-18 ..
+          ninja
+          valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes 
--verbose ./opendal_cpp_test
+
       - name: Build Cpp examples
         working-directory: "examples/cpp"
         run: |
diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index c2861de61..8497d439a 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -34,6 +34,7 @@ anyhow = "1.0"
 chrono = "0.4"
 cxx = "1.0"
 cxx-async = { version = "0.1.2", optional = true }
+futures = "0.3"
 # this crate won't be published, we always use the local version
 opendal = { version = ">=0", path = "../../core", features = [
   "blocking",
diff --git a/bindings/cpp/README.md b/bindings/cpp/README.md
index 46501c540..6dc7f0084 100644
--- a/bindings/cpp/README.md
+++ b/bindings/cpp/README.md
@@ -101,6 +101,7 @@ make docs
 - `OPENDAL_ENABLE_DOCUMENTATION`: Enable documentation. Default: `OFF`
 - `OPENDAL_DOCS_ONLY`: Only build documentation. Default: `OFF`
 - `OPENDAL_ENABLE_TESTING`: Enable testing. Default: `OFF`
+- `OPENDAL_ENABLE_ASYNC`: Enable async support. Default: `OFF`
 
 ## License and Trademarks
 
diff --git a/bindings/cpp/include/async_defs.hpp 
b/bindings/cpp/include/async_defs.hpp
index c8a831d0c..3b2e6c193 100644
--- a/bindings/cpp/include/async_defs.hpp
+++ b/bindings/cpp/include/async_defs.hpp
@@ -24,3 +24,8 @@
 
 CXXASYNC_DEFINE_FUTURE(rust::Vec<uint8_t>, opendal, ffi, async, 
RustFutureRead);
 CXXASYNC_DEFINE_FUTURE(void, opendal, ffi, async, RustFutureWrite);
+CXXASYNC_DEFINE_FUTURE(rust::Vec<rust::String>, opendal, ffi, async, 
RustFutureList);
+CXXASYNC_DEFINE_FUTURE(bool, opendal, ffi, async, RustFutureBool);
+CXXASYNC_DEFINE_FUTURE(size_t, opendal, ffi, async, RustFutureReaderId);
+CXXASYNC_DEFINE_FUTURE(size_t, opendal, ffi, async, RustFutureListerId);
+CXXASYNC_DEFINE_FUTURE(rust::String, opendal, ffi, async, 
RustFutureEntryOption);
diff --git a/bindings/cpp/include/opendal_async.hpp 
b/bindings/cpp/include/opendal_async.hpp
index 45524df5b..1f5deca10 100644
--- a/bindings/cpp/include/opendal_async.hpp
+++ b/bindings/cpp/include/opendal_async.hpp
@@ -19,8 +19,12 @@
 
 #pragma once
 
+#include <cstdint>
 #include <optional>
 #include <span>
+#include <string>
+#include <string_view>
+#include <unordered_map>
 
 #include "async.rs.h"
 #include "async_defs.hpp"
@@ -47,8 +51,107 @@ class Operator {
   using WriteFuture = opendal::ffi::async::RustFutureWrite;
   WriteFuture write(std::string_view path, std::span<uint8_t> data);
 
+  using ListFuture = opendal::ffi::async::RustFutureList;
+  ListFuture list(std::string_view path);
+
+  using ExistsFuture = opendal::ffi::async::RustFutureBool;
+  ExistsFuture exists(std::string_view path);
+
+  using CreateDirFuture = opendal::ffi::async::RustFutureWrite;
+  CreateDirFuture create_dir(std::string_view path);
+
+  using CopyFuture = opendal::ffi::async::RustFutureWrite;
+  CopyFuture copy(std::string_view from, std::string_view to);
+
+  using RenameFuture = opendal::ffi::async::RustFutureWrite;
+  RenameFuture rename(std::string_view from, std::string_view to);
+
+  using DeleteFuture = opendal::ffi::async::RustFutureWrite;
+  DeleteFuture delete_path(std::string_view path);
+
+  using RemoveAllFuture = opendal::ffi::async::RustFutureWrite;
+  RemoveAllFuture remove_all(std::string_view path);
+
+  using ReaderFuture = opendal::ffi::async::RustFutureReaderId;
+  ReaderFuture reader(std::string_view path);
+
+  using ListerFuture = opendal::ffi::async::RustFutureListerId;
+  ListerFuture lister(std::string_view path);
+
  private:
   rust::Box<opendal::ffi::async::Operator> operator_;
 };
 
+/**
+ * @class Reader
+ * @brief Async Reader is designed to read data from a specific path in an 
asynchronous manner.
+ * @details It provides streaming read operations with range support.
+ */
+class Reader {
+ public:
+  // Disable copy and assign
+  Reader(const Reader &) = delete;
+  Reader &operator=(const Reader &) = delete;
+
+  // Enable move
+  Reader(Reader &&other) noexcept;
+  Reader &operator=(Reader &&other) noexcept;
+  ~Reader() noexcept;
+
+  // Constructor from ID (for tests and advanced usage)
+  explicit Reader(size_t reader_id) noexcept;
+
+  using ReadFuture = opendal::ffi::async::RustFutureRead;
+  
+  /**
+   * @brief Read data from the specified range
+   * @param start Start offset in bytes
+   * @param len Number of bytes to read
+   * @return Future that resolves to the read data
+   */
+  ReadFuture read(uint64_t start, uint64_t len);
+
+ private:
+  friend class Operator;
+  
+  void destroy() noexcept;
+
+  size_t reader_id_{0};
+};
+
+/**
+ * @class Lister
+ * @brief Async Lister is designed to list entries at a specified path in an 
asynchronous manner.
+ * @details It provides streaming iteration over directory entries.
+ */
+class Lister {
+ public:
+  // Disable copy and assign  
+  Lister(const Lister &) = delete;
+  Lister &operator=(const Lister &) = delete;
+
+  // Enable move
+  Lister(Lister &&other) noexcept;
+  Lister &operator=(Lister &&other) noexcept;
+  ~Lister() noexcept;
+
+  // Constructor from ID (for tests and advanced usage)
+  explicit Lister(size_t lister_id) noexcept;
+
+  using NextFuture = opendal::ffi::async::RustFutureEntryOption;
+  
+  /**
+   * @brief Get the next entry in the listing
+   * @return Future that resolves to the next entry path, or empty string if 
no more entries
+   */
+  NextFuture next();
+
+ private:
+  friend class Operator;
+  
+  void destroy() noexcept;
+
+  size_t lister_id_{0};
+};
+
 }  // namespace opendal::async
diff --git a/bindings/cpp/src/async.rs b/bindings/cpp/src/async.rs
index b4f94899e..323a2b69b 100644
--- a/bindings/cpp/src/async.rs
+++ b/bindings/cpp/src/async.rs
@@ -22,6 +22,8 @@ use std::collections::HashMap;
 use std::future::Future;
 use std::ops::Deref;
 use std::str::FromStr;
+use std::sync::{Arc, OnceLock};
+use tokio::sync::Mutex;
 
 #[cxx::bridge(namespace = "opendal::ffi::async")]
 mod ffi {
@@ -37,17 +39,47 @@ mod ffi {
         op: *const Operator,
     }
 
+    pub struct ReaderPtr {
+        id: usize,
+    }
+
+    pub struct ListerPtr {
+        id: usize,
+    }
+
     extern "Rust" {
         type Operator;
+        type Reader;
+        type Lister;
 
         fn new_operator(scheme: &str, configs: Vec<HashMapValue>) -> 
Result<Box<Operator>>;
         unsafe fn operator_read(op: OperatorPtr, path: String) -> 
RustFutureRead;
         unsafe fn operator_write(op: OperatorPtr, path: String, bs: Vec<u8>) 
-> RustFutureWrite;
+        unsafe fn operator_list(op: OperatorPtr, path: String) -> 
RustFutureList;
+        unsafe fn operator_exists(op: OperatorPtr, path: String) -> 
RustFutureBool;
+        unsafe fn operator_create_dir(op: OperatorPtr, path: String) -> 
RustFutureWrite;
+        unsafe fn operator_copy(op: OperatorPtr, from: String, to: String) -> 
RustFutureWrite;
+        unsafe fn operator_rename(op: OperatorPtr, from: String, to: String) 
-> RustFutureWrite;
+        unsafe fn operator_delete(op: OperatorPtr, path: String) -> 
RustFutureWrite;
+        unsafe fn operator_remove_all(op: OperatorPtr, path: String) -> 
RustFutureWrite;
+        unsafe fn operator_reader(op: OperatorPtr, path: String) -> 
RustFutureReaderId;
+        unsafe fn operator_lister(op: OperatorPtr, path: String) -> 
RustFutureListerId;
+
+        unsafe fn reader_read(reader: ReaderPtr, start: u64, len: u64) -> 
RustFutureRead;
+        unsafe fn lister_next(lister: ListerPtr) -> RustFutureEntryOption;
+
+        fn delete_reader(reader: ReaderPtr);
+        fn delete_lister(lister: ListerPtr);
     }
 
     extern "C++" {
         type RustFutureRead = super::RustFutureRead;
         type RustFutureWrite = super::RustFutureWrite;
+        type RustFutureList = super::RustFutureList;
+        type RustFutureBool = super::RustFutureBool;
+        type RustFutureReaderId = super::RustFutureReaderId;
+        type RustFutureListerId = super::RustFutureListerId;
+        type RustFutureEntryOption = super::RustFutureEntryOption;
     }
 }
 
@@ -61,7 +93,62 @@ unsafe impl Future for RustFutureWrite {
     type Output = ();
 }
 
+#[cxx_async::bridge(namespace = opendal::ffi::async)]
+unsafe impl Future for RustFutureList {
+    type Output = Vec<String>;
+}
+
+#[cxx_async::bridge(namespace = opendal::ffi::async)]
+unsafe impl Future for RustFutureBool {
+    type Output = bool;
+}
+
+#[cxx_async::bridge(namespace = opendal::ffi::async)]
+unsafe impl Future for RustFutureReaderId {
+    type Output = usize;
+}
+
+#[cxx_async::bridge(namespace = opendal::ffi::async)]
+unsafe impl Future for RustFutureListerId {
+    type Output = usize;
+}
+
+#[cxx_async::bridge(namespace = opendal::ffi::async)]
+unsafe impl Future for RustFutureEntryOption {
+    type Output = String;
+}
+
 pub struct Operator(od::Operator);
+pub struct Reader {
+    reader: Arc<od::Reader>,
+    id: usize,
+}
+pub struct Lister {
+    lister: Arc<Mutex<od::Lister>>,
+    id: usize,
+}
+
+// Global storage for readers and listers to avoid Send issues with raw 
pointers
+static READER_STORAGE: OnceLock<Mutex<HashMap<usize, Arc<od::Reader>>>> = 
OnceLock::new();
+static READER_COUNTER: OnceLock<Mutex<usize>> = OnceLock::new();
+static LISTER_STORAGE: OnceLock<Mutex<HashMap<usize, Arc<Mutex<od::Lister>>>>> 
= OnceLock::new();
+static LISTER_COUNTER: OnceLock<Mutex<usize>> = OnceLock::new();
+
+fn get_reader_storage() -> &'static Mutex<HashMap<usize, Arc<od::Reader>>> {
+    READER_STORAGE.get_or_init(|| Mutex::new(HashMap::new()))
+}
+
+fn get_reader_counter() -> &'static Mutex<usize> {
+    READER_COUNTER.get_or_init(|| Mutex::new(0))
+}
+
+fn get_lister_storage() -> &'static Mutex<HashMap<usize, 
Arc<Mutex<od::Lister>>>> {
+    LISTER_STORAGE.get_or_init(|| Mutex::new(HashMap::new()))
+}
+
+fn get_lister_counter() -> &'static Mutex<usize> {
+    LISTER_COUNTER.get_or_init(|| Mutex::new(0))
+}
 
 fn new_operator(scheme: &str, configs: Vec<ffi::HashMapValue>) -> 
Result<Box<Operator>> {
     let scheme = od::Scheme::from_str(scheme)?;
@@ -85,6 +172,8 @@ impl Deref for ffi::OperatorPtr {
 }
 
 unsafe impl Send for ffi::OperatorPtr {}
+unsafe impl Send for ffi::ReaderPtr {}
+unsafe impl Send for ffi::ListerPtr {}
 
 unsafe fn operator_read(op: ffi::OperatorPtr, path: String) -> RustFutureRead {
     RustFutureRead::fallible(async move {
@@ -105,3 +194,156 @@ unsafe fn operator_write(op: ffi::OperatorPtr, path: 
String, bs: Vec<u8>) -> Rus
             .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))
     })
 }
+
+unsafe fn operator_list(op: ffi::OperatorPtr, path: String) -> RustFutureList {
+    RustFutureList::fallible(async move {
+        let entries =
+            op.0.list(&path)
+                .await
+                .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))?;
+        Ok(entries.into_iter().map(|e| e.path().to_string()).collect())
+    })
+}
+
+unsafe fn operator_exists(op: ffi::OperatorPtr, path: String) -> 
RustFutureBool {
+    RustFutureBool::fallible(async move {
+        op.0.exists(&path)
+            .await
+            .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))
+    })
+}
+
+unsafe fn operator_create_dir(op: ffi::OperatorPtr, path: String) -> 
RustFutureWrite {
+    RustFutureWrite::fallible(async move {
+        op.0.create_dir(&path)
+            .await
+            .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))
+    })
+}
+
+unsafe fn operator_copy(op: ffi::OperatorPtr, from: String, to: String) -> 
RustFutureWrite {
+    RustFutureWrite::fallible(async move {
+        op.0.copy(&from, &to)
+            .await
+            .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))
+    })
+}
+
+unsafe fn operator_rename(op: ffi::OperatorPtr, from: String, to: String) -> 
RustFutureWrite {
+    RustFutureWrite::fallible(async move {
+        op.0.rename(&from, &to)
+            .await
+            .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))
+    })
+}
+
+unsafe fn operator_delete(op: ffi::OperatorPtr, path: String) -> 
RustFutureWrite {
+    RustFutureWrite::fallible(async move {
+        op.0.delete(&path)
+            .await
+            .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))
+    })
+}
+
+unsafe fn operator_remove_all(op: ffi::OperatorPtr, path: String) -> 
RustFutureWrite {
+    RustFutureWrite::fallible(async move {
+        op.0.remove_all(&path)
+            .await
+            .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))
+    })
+}
+
+unsafe fn operator_reader(op: ffi::OperatorPtr, path: String) -> 
RustFutureReaderId {
+    RustFutureReaderId::fallible(async move {
+        let reader =
+            op.0.reader(&path)
+                .await
+                .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))?;
+
+        // Store the reader in global storage and return an ID
+        let reader_arc = Arc::new(reader);
+        let mut counter = get_reader_counter().lock().await;
+        *counter += 1;
+        let id = *counter;
+
+        let mut storage = get_reader_storage().lock().await;
+        storage.insert(id, reader_arc);
+
+        Ok(id)
+    })
+}
+
+unsafe fn operator_lister(op: ffi::OperatorPtr, path: String) -> 
RustFutureListerId {
+    RustFutureListerId::fallible(async move {
+        let lister =
+            op.0.lister(&path)
+                .await
+                .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))?;
+
+        // Store the lister in global storage and return an ID
+        let lister_arc = Arc::new(Mutex::new(lister));
+        let mut counter = get_lister_counter().lock().await;
+        *counter += 1;
+        let id = *counter;
+
+        let mut storage = get_lister_storage().lock().await;
+        storage.insert(id, lister_arc);
+
+        Ok(id)
+    })
+}
+
+unsafe fn reader_read(reader: ffi::ReaderPtr, start: u64, len: u64) -> 
RustFutureRead {
+    RustFutureRead::fallible(async move {
+        let storage = get_reader_storage().lock().await;
+        let reader_arc = storage
+            .get(&reader.id)
+            .ok_or_else(|| CxxAsyncException::new("Invalid reader ID".into()))?
+            .clone();
+        drop(storage);
+
+        let buffer = reader_arc
+            .read(start..(start + len))
+            .await
+            .map_err(|e| 
CxxAsyncException::new(e.to_string().into_boxed_str()))?;
+        Ok(buffer.to_vec())
+    })
+}
+
+unsafe fn lister_next(lister: ffi::ListerPtr) -> RustFutureEntryOption {
+    RustFutureEntryOption::fallible(async move {
+        use futures::TryStreamExt;
+
+        let storage = get_lister_storage().lock().await;
+        let lister_arc = storage
+            .get(&lister.id)
+            .ok_or_else(|| CxxAsyncException::new("Invalid lister ID".into()))?
+            .clone();
+        drop(storage);
+
+        let mut lister_guard = lister_arc.lock().await;
+        match lister_guard.try_next().await {
+            Ok(Some(entry)) => Ok(entry.path().to_string()),
+            Ok(None) => Ok(String::new()), // Empty string indicates end of 
iteration
+            Err(e) => 
Err(CxxAsyncException::new(e.to_string().into_boxed_str())),
+        }
+    })
+}
+
+fn delete_reader(reader: ffi::ReaderPtr) {
+    // Use blocking lock since this is called from C++ destructors
+    if let Ok(mut storage) = get_reader_storage().try_lock() {
+        storage.remove(&reader.id);
+    }
+    // If we can't get the lock immediately, we'll just skip cleanup
+    // This is better than panicking in a destructor
+}
+
+fn delete_lister(lister: ffi::ListerPtr) {
+    // Use blocking lock since this is called from C++ destructors
+    if let Ok(mut storage) = get_lister_storage().try_lock() {
+        storage.remove(&lister.id);
+    }
+    // If we can't get the lock immediately, we'll just skip cleanup
+    // This is better than panicking in a destructor
+}
diff --git a/bindings/cpp/src/opendal_async.cpp 
b/bindings/cpp/src/opendal_async.cpp
index 6ec2dccad..3999a8d0e 100644
--- a/bindings/cpp/src/opendal_async.cpp
+++ b/bindings/cpp/src/opendal_async.cpp
@@ -58,3 +58,110 @@ Operator::WriteFuture Operator::write(std::string_view path,
   return opendal::ffi::async::operator_write(
       opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path), vec);
 }
+
+Operator::ListFuture Operator::list(std::string_view path) {
+  return opendal::ffi::async::operator_list(
+      opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::ExistsFuture Operator::exists(std::string_view path) {
+  return opendal::ffi::async::operator_exists(
+      opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::CreateDirFuture Operator::create_dir(std::string_view path) {
+  return opendal::ffi::async::operator_create_dir(
+      opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::CopyFuture Operator::copy(std::string_view from, std::string_view 
to) {
+  return opendal::ffi::async::operator_copy(
+      opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(from),
+      RUST_STRING(to));
+}
+
+Operator::RenameFuture Operator::rename(std::string_view from, 
std::string_view to) {
+  return opendal::ffi::async::operator_rename(
+      opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(from),
+      RUST_STRING(to));
+}
+
+Operator::DeleteFuture Operator::delete_path(std::string_view path) {
+  return opendal::ffi::async::operator_delete(
+      opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::RemoveAllFuture Operator::remove_all(std::string_view path) {
+  return opendal::ffi::async::operator_remove_all(
+      opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::ReaderFuture Operator::reader(std::string_view path) {
+  return opendal::ffi::async::operator_reader(
+      opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::ListerFuture Operator::lister(std::string_view path) {
+  return opendal::ffi::async::operator_lister(
+      opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+// Reader implementation
+Reader::Reader(size_t reader_id) noexcept : reader_id_(reader_id) {}
+
+Reader::Reader(Reader &&other) noexcept : reader_id_(other.reader_id_) {
+  other.reader_id_ = 0;
+}
+
+Reader &Reader::operator=(Reader &&other) noexcept {
+  if (this != &other) {
+    reader_id_ = other.reader_id_;
+    other.reader_id_ = 0;
+  }
+  return *this;
+}
+
+Reader::~Reader() noexcept { destroy(); }
+
+void Reader::destroy() noexcept {
+  if (reader_id_ != 0) {
+    opendal::ffi::async::delete_reader(
+        opendal::ffi::async::ReaderPtr{reader_id_});
+    reader_id_ = 0;
+  }
+}
+
+Reader::ReadFuture Reader::read(uint64_t start, uint64_t len) {
+  return opendal::ffi::async::reader_read(
+      opendal::ffi::async::ReaderPtr{reader_id_}, start, len);
+}
+
+// Lister implementation
+Lister::Lister(size_t lister_id) noexcept : lister_id_(lister_id) {}
+
+Lister::Lister(Lister &&other) noexcept : lister_id_(other.lister_id_) {
+  other.lister_id_ = 0;
+}
+
+Lister &Lister::operator=(Lister &&other) noexcept {
+  if (this != &other) {
+    lister_id_ = other.lister_id_;
+    other.lister_id_ = 0;
+  }
+  return *this;
+}
+
+Lister::~Lister() noexcept { destroy(); }
+
+void Lister::destroy() noexcept {
+  if (lister_id_ != 0) {
+    opendal::ffi::async::delete_lister(
+        opendal::ffi::async::ListerPtr{lister_id_});
+    lister_id_ = 0;
+  }
+}
+
+Lister::NextFuture Lister::next() {
+  return opendal::ffi::async::lister_next(
+      opendal::ffi::async::ListerPtr{lister_id_});
+}
diff --git a/bindings/cpp/tests/async_test.cpp 
b/bindings/cpp/tests/async_test.cpp
index dc25c2068..8e7ee7ca6 100644
--- a/bindings/cpp/tests/async_test.cpp
+++ b/bindings/cpp/tests/async_test.cpp
@@ -57,3 +57,318 @@ TEST_F(AsyncOpendalTest, BasicTest) {
     co_return;
   }());
 }
+
+TEST_F(AsyncOpendalTest, AsyncOperationsTest) {
+  cppcoro::sync_wait([&]() -> cppcoro::task<void> {
+    const auto dir_path = "test_async_dir/";
+    const auto file_path = "test_async_dir/test_file.txt";
+    std::vector<uint8_t> file_content = {1, 2, 3, 4, 5};
+
+    // Create directory and check existence
+    co_await op->create_dir(dir_path);
+    auto dir_exists = co_await op->exists(dir_path);
+    EXPECT_TRUE(dir_exists);
+
+    // Check non-existent file
+    auto non_existent_exists = co_await op->exists("non_existent_file");
+    EXPECT_FALSE(non_existent_exists);
+
+    // Write a file and check its existence
+    co_await op->write(file_path, file_content);
+    auto file_exists = co_await op->exists(file_path);
+    EXPECT_TRUE(file_exists);
+
+    const auto copied_file_path = "test_async_dir/copied_file.txt";
+    co_await op->write(copied_file_path, file_content);
+    auto copied_file_exists = co_await op->exists(copied_file_path);
+    EXPECT_TRUE(copied_file_exists);
+    auto copied_content_rust_vec = co_await op->read(copied_file_path);
+    EXPECT_EQ(file_content.size(), copied_content_rust_vec.size());
+    for (size_t i = 0; i < file_content.size(); ++i) {
+      EXPECT_EQ(file_content[i], copied_content_rust_vec[i]);
+    }
+
+    const auto renamed_file_path = "test_async_dir/renamed_file.txt";
+    co_await op->write(renamed_file_path, file_content);
+    co_await op->delete_path(copied_file_path);
+    auto old_copied_exists = co_await op->exists(copied_file_path);
+    EXPECT_FALSE(old_copied_exists);
+    auto renamed_file_exists = co_await op->exists(renamed_file_path);
+    EXPECT_TRUE(renamed_file_exists);
+    auto renamed_content_rust_vec = co_await op->read(renamed_file_path);
+    EXPECT_EQ(file_content.size(), renamed_content_rust_vec.size());
+    for (size_t i = 0; i < file_content.size(); ++i) {
+      EXPECT_EQ(file_content[i], renamed_content_rust_vec[i]);
+    }
+
+    // Delete the renamed file and check non-existence
+    co_await op->delete_path(renamed_file_path);
+    auto deleted_exists = co_await op->exists(renamed_file_path);
+    EXPECT_FALSE(deleted_exists);
+
+    // Create another file in the directory
+    const auto another_file_path = "test_async_dir/another_file.txt";
+    co_await op->write(another_file_path, file_content);
+    auto another_file_exists = co_await op->exists(another_file_path);
+    EXPECT_TRUE(another_file_exists);
+
+    // Remove the entire directory and check non-existence of the directory 
and its contents
+    co_await op->remove_all(dir_path);
+    auto dir_after_remove_exists = co_await op->exists(dir_path);
+    EXPECT_FALSE(dir_after_remove_exists);
+    auto file_in_removed_dir_exists = co_await op->exists(file_path); // 
Original file path
+    EXPECT_FALSE(file_in_removed_dir_exists);
+    auto another_file_in_removed_dir_exists = co_await 
op->exists(another_file_path);
+    EXPECT_FALSE(another_file_in_removed_dir_exists);
+
+    // Test listing after cleaning up
+    const auto list_dir_path = "test_list_dir/";
+    const auto list_file1_path = "test_list_dir/file1.txt";
+    const auto list_file2_path = "test_list_dir/file2.txt";
+    const auto list_subdir_path = "test_list_dir/subdir/";
+
+    co_await op->create_dir(list_dir_path);
+    co_await op->write(list_file1_path, file_content);
+    co_await op->write(list_file2_path, file_content);
+    co_await op->create_dir(list_subdir_path);
+
+    auto listed_entries = co_await op->list(list_dir_path);
+    EXPECT_EQ(listed_entries.size(), 4);
+
+    bool found_file1 = false;
+    bool found_file2 = false;
+    bool found_subdir = false;
+    for (const auto& entry : listed_entries) {
+      if (std::string(entry).ends_with("file1.txt")) found_file1 = true;
+      if (std::string(entry).ends_with("file2.txt")) found_file2 = true;
+      if (std::string(entry).ends_with("subdir/")) found_subdir = true;
+    }
+    EXPECT_TRUE(found_file1);
+    EXPECT_TRUE(found_file2);
+    EXPECT_TRUE(found_subdir);
+
+    // Clean up list test directory
+    co_await op->remove_all(list_dir_path);
+    co_return;
+  }());
+}
+
+TEST_F(AsyncOpendalTest, AsyncReaderTest) {
+  cppcoro::sync_wait([&]() -> cppcoro::task<void> {
+    const auto file_path = "test_async_reader.txt";
+    
+    // Create test data - larger to test range reads
+    std::vector<uint8_t> test_data;
+    for (int i = 0; i < 1000; ++i) {
+      test_data.push_back(static_cast<uint8_t>(i % 256));
+    }
+    
+    // Write the test file
+    co_await op->write(file_path, test_data);
+    
+    // Test: Create async reader
+    auto reader_id = co_await op->reader(file_path);
+    opendal::async::Reader reader(reader_id);
+    
+    // Test: Read full file in one go
+    auto full_data = co_await reader.read(0, test_data.size());
+    EXPECT_EQ(full_data.size(), test_data.size());
+    for (size_t i = 0; i < test_data.size(); ++i) {
+      EXPECT_EQ(full_data[i], test_data[i]);
+    }
+    
+    // Test: Read in chunks
+    const size_t chunk_size = 100;
+    std::vector<uint8_t> chunked_data;
+    
+    for (size_t offset = 0; offset < test_data.size(); offset += chunk_size) {
+      size_t read_size = std::min(chunk_size, test_data.size() - offset);
+      auto chunk = co_await reader.read(offset, read_size);
+      
+      EXPECT_EQ(chunk.size(), read_size);
+      chunked_data.insert(chunked_data.end(), chunk.begin(), chunk.end());
+    }
+    
+    // Verify chunked read matches original data
+    EXPECT_EQ(chunked_data.size(), test_data.size());
+    for (size_t i = 0; i < test_data.size(); ++i) {
+      EXPECT_EQ(chunked_data[i], test_data[i]);
+    }
+    
+    // Test: Read specific range
+    auto range_data = co_await reader.read(100, 50);
+    EXPECT_EQ(range_data.size(), 50);
+    for (size_t i = 0; i < 50; ++i) {
+      EXPECT_EQ(range_data[i], test_data[100 + i]);
+    }
+    
+    // Test: Read from end of file
+    auto end_data = co_await reader.read(test_data.size() - 10, 10);
+    EXPECT_EQ(end_data.size(), 10);
+    for (size_t i = 0; i < 10; ++i) {
+      EXPECT_EQ(end_data[i], test_data[test_data.size() - 10 + i]);
+    }
+    
+    // Test: Move semantics
+    auto reader_id2 = co_await op->reader(file_path);
+    opendal::async::Reader reader2(reader_id2);
+    opendal::async::Reader moved_reader = std::move(reader2);
+    
+    auto moved_data = co_await moved_reader.read(0, 100);
+    EXPECT_EQ(moved_data.size(), 100);
+    for (size_t i = 0; i < 100; ++i) {
+      EXPECT_EQ(moved_data[i], test_data[i]);
+    }
+    
+    // Clean up
+    co_await op->delete_path(file_path);
+    co_return;
+  }());
+}
+
+TEST_F(AsyncOpendalTest, AsyncListerTest) {
+  cppcoro::sync_wait([&]() -> cppcoro::task<void> {
+    const auto base_dir = "test_async_lister/";
+    const auto file1_path = "test_async_lister/file1.txt";
+    const auto file2_path = "test_async_lister/file2.txt";
+    const auto file3_path = "test_async_lister/file3.txt";
+    const auto subdir_path = "test_async_lister/subdir/";
+    const auto subdir_file_path = "test_async_lister/subdir/nested_file.txt";
+    
+    auto test_data = std::vector<uint8_t>{1, 2, 3, 4, 5};
+    
+    // Set up test directory structure
+    co_await op->create_dir(base_dir);
+    co_await op->write(file1_path, test_data);
+    co_await op->write(file2_path, test_data);
+    co_await op->write(file3_path, test_data);
+    co_await op->create_dir(subdir_path);
+    co_await op->write(subdir_file_path, test_data);
+    
+    // Test: Create async lister
+    auto lister_id = co_await op->lister(base_dir);
+    auto lister = opendal::async::Lister(lister_id);
+    
+    // Test: Iterate through all entries
+    auto found_entries = std::vector<std::string>{};
+    while (true) {
+      auto entry_path = co_await lister.next();
+      if (entry_path.empty()) {
+        break; // End of iteration
+      }
+      found_entries.push_back(std::string(entry_path));
+    }
+    
+    // Verify we found the expected entries
+    EXPECT_EQ(found_entries.size(), 5); // At least 4 entries (3 files + 1 
subdir)
+    
+    bool found_file1 = false;
+    bool found_file2 = false;
+    bool found_file3 = false;
+    bool found_subdir = false;
+    for (const auto& entry : found_entries) {
+      if (std::string(entry).ends_with("file1.txt")) found_file1 = true;
+      if (std::string(entry).ends_with("file2.txt")) found_file2 = true;
+      if (std::string(entry).ends_with("file3.txt")) found_file3 = true;
+      if (std::string(entry).ends_with("subdir/")) found_subdir = true;
+    }
+    
+    EXPECT_TRUE(found_file1);
+    EXPECT_TRUE(found_file2);
+    EXPECT_TRUE(found_file3);
+    EXPECT_TRUE(found_subdir);
+    
+    // Test: Empty directory listing
+    const auto empty_dir = "test_async_lister_empty/";
+    co_await op->create_dir(empty_dir);
+    
+    auto empty_lister_id = co_await op->lister(empty_dir);
+    auto empty_lister = opendal::async::Lister(empty_lister_id);
+    
+    auto first_entry = co_await empty_lister.next();    
+    // Test: Move semantics
+    auto lister_id2 = co_await op->lister(base_dir);
+    auto lister2 = opendal::async::Lister(lister_id2);
+    opendal::async::Lister moved_lister = std::move(lister2);
+    
+    // Should be able to use moved lister
+    auto moved_entry = co_await moved_lister.next();
+    EXPECT_FALSE(moved_entry.empty()); // Should find at least one entry
+    
+    // Clean up
+    co_await op->remove_all(base_dir);
+    co_await op->remove_all(empty_dir);
+    co_return;
+  }());
+}
+
+TEST_F(AsyncOpendalTest, AsyncListerErrorHandlingTest) {
+  cppcoro::sync_wait([&]() -> cppcoro::task<void> {
+    // Test: Listing non-existent directory
+    auto lister_id = co_await op->lister("non_existent_directory/");
+    auto lister = opendal::async::Lister(lister_id);
+    auto entry = co_await lister.next();
+    EXPECT_TRUE(entry.empty());
+    
+    co_return;
+  }());
+}
+
+TEST_F(AsyncOpendalTest, AsyncReaderListerIntegrationTest) {
+  cppcoro::sync_wait([&]() -> cppcoro::task<void> {
+    const auto base_dir = "test_integration/";
+    const auto large_file = "test_integration/large_file.bin";
+    
+    // Create a larger file for more realistic testing
+    std::vector<uint8_t> large_data;
+    large_data.reserve(10000);
+    for (int i = 0; i < 10000; ++i) {
+      large_data.push_back(static_cast<uint8_t>(i % 256));
+    }
+    
+    // Set up test environment
+    co_await op->create_dir(base_dir);
+    co_await op->write(large_file, large_data);
+    
+    // Test: Use lister to find the file, then reader to read it
+    auto lister_id = co_await op->lister(base_dir);
+    auto lister = opendal::async::Lister(lister_id);
+    
+    std::string found_file;
+    while (true) {
+      auto entry = co_await lister.next();
+      if (entry.empty()) break;
+      
+      if (std::string(entry).ends_with("large_file.bin")) {
+        found_file = std::string(entry);
+        break;
+      }
+    }
+    
+    EXPECT_FALSE(found_file.empty());
+    
+    // Now read the found file using async reader
+    auto reader_id = co_await op->reader(large_file);
+    auto reader = opendal::async::Reader(reader_id);
+    
+    // Read in multiple chunks to test streaming
+    const size_t chunk_size = 1000;
+    auto reconstructed_data = std::vector<uint8_t>{};
+    
+    for (size_t offset = 0; offset < large_data.size(); offset += chunk_size) {
+      size_t read_size = std::min(chunk_size, large_data.size() - offset);
+      auto chunk = co_await reader.read(offset, read_size);
+      reconstructed_data.insert(reconstructed_data.end(), chunk.begin(), 
chunk.end());
+    }
+    
+    // Verify the data integrity
+    EXPECT_EQ(reconstructed_data.size(), large_data.size());
+    for (size_t i = 0; i < large_data.size(); ++i) {
+      EXPECT_EQ(reconstructed_data[i], large_data[i]);
+    }
+    
+    // Clean up
+    co_await op->remove_all(base_dir);
+    co_return;
+  }());
+}

Reply via email to