This is an automated email from the ASF dual-hosted git repository.
asukaminato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 5bfd360db feat(bindings/cpp): cpp async op && reader, lister (#6228)
5bfd360db is described below
commit 5bfd360db9adbd0d6f33b62ad295c0aa1cba8e0e
Author: Asuka Minato <[email protected]>
AuthorDate: Mon Jun 16 19:47:57 2025 +0900
feat(bindings/cpp): cpp async op && reader, lister (#6228)
* add list support
* add more async op
* try add tests
* fmt
* try fix
* use co_await
* see whether other works
* skip copy test
* try fix
* try ?
* pass the test
* rm
* async reader / lister
* fix some comment
* not in Reader &Reader::operator=(Reader &&other).
* try add valgrind for async
* fix conflict
* rm valgrind
---
.github/workflows/ci_bindings_cpp.yml | 11 +-
bindings/cpp/Cargo.toml | 1 +
bindings/cpp/README.md | 1 +
bindings/cpp/include/async_defs.hpp | 5 +
bindings/cpp/include/opendal_async.hpp | 103 +++++++++++
bindings/cpp/src/async.rs | 242 +++++++++++++++++++++++++
bindings/cpp/src/opendal_async.cpp | 107 +++++++++++
bindings/cpp/tests/async_test.cpp | 315 +++++++++++++++++++++++++++++++++
8 files changed, 784 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/ci_bindings_cpp.yml
b/.github/workflows/ci_bindings_cpp.yml
index 709297c4b..4765e1de9 100644
--- a/.github/workflows/ci_bindings_cpp.yml
+++ b/.github/workflows/ci_bindings_cpp.yml
@@ -84,7 +84,16 @@ jobs:
cmake -GNinja -DOPENDAL_DEV=ON -DOPENDAL_ENABLE_ASYNC=ON
-DCMAKE_CXX_COMPILER=clang++-18 ..
ninja
./opendal_cpp_test
-
+
+ - name: test async with valgrind
+ working-directory: "bindings/cpp"
+ run: |
+ mkdir build-valgrind-async
+ cd build-valgrind-async
+ cmake -GNinja -DOPENDAL_ENABLE_ASYNC=ON -DOPENDAL_ENABLE_TESTING=ON
-DCMAKE_CXX_COMPILER=clang++-18 ..
+ ninja
+ valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes
--verbose ./opendal_cpp_test
+
- name: Build Cpp examples
working-directory: "examples/cpp"
run: |
diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index c2861de61..8497d439a 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -34,6 +34,7 @@ anyhow = "1.0"
chrono = "0.4"
cxx = "1.0"
cxx-async = { version = "0.1.2", optional = true }
+futures = "0.3"
# this crate won't be published, we always use the local version
opendal = { version = ">=0", path = "../../core", features = [
"blocking",
diff --git a/bindings/cpp/README.md b/bindings/cpp/README.md
index 46501c540..6dc7f0084 100644
--- a/bindings/cpp/README.md
+++ b/bindings/cpp/README.md
@@ -101,6 +101,7 @@ make docs
- `OPENDAL_ENABLE_DOCUMENTATION`: Enable documentation. Default: `OFF`
- `OPENDAL_DOCS_ONLY`: Only build documentation. Default: `OFF`
- `OPENDAL_ENABLE_TESTING`: Enable testing. Default: `OFF`
+- `OPENDAL_ENABLE_ASYNC`: Enable async support. Default: `OFF`
## License and Trademarks
diff --git a/bindings/cpp/include/async_defs.hpp
b/bindings/cpp/include/async_defs.hpp
index c8a831d0c..3b2e6c193 100644
--- a/bindings/cpp/include/async_defs.hpp
+++ b/bindings/cpp/include/async_defs.hpp
@@ -24,3 +24,8 @@
CXXASYNC_DEFINE_FUTURE(rust::Vec<uint8_t>, opendal, ffi, async,
RustFutureRead);
CXXASYNC_DEFINE_FUTURE(void, opendal, ffi, async, RustFutureWrite);
+CXXASYNC_DEFINE_FUTURE(rust::Vec<rust::String>, opendal, ffi, async,
RustFutureList);
+CXXASYNC_DEFINE_FUTURE(bool, opendal, ffi, async, RustFutureBool);
+CXXASYNC_DEFINE_FUTURE(size_t, opendal, ffi, async, RustFutureReaderId);
+CXXASYNC_DEFINE_FUTURE(size_t, opendal, ffi, async, RustFutureListerId);
+CXXASYNC_DEFINE_FUTURE(rust::String, opendal, ffi, async,
RustFutureEntryOption);
diff --git a/bindings/cpp/include/opendal_async.hpp
b/bindings/cpp/include/opendal_async.hpp
index 45524df5b..1f5deca10 100644
--- a/bindings/cpp/include/opendal_async.hpp
+++ b/bindings/cpp/include/opendal_async.hpp
@@ -19,8 +19,12 @@
#pragma once
+#include <cstdint>
#include <optional>
#include <span>
+#include <string>
+#include <string_view>
+#include <unordered_map>
#include "async.rs.h"
#include "async_defs.hpp"
@@ -47,8 +51,107 @@ class Operator {
using WriteFuture = opendal::ffi::async::RustFutureWrite;
WriteFuture write(std::string_view path, std::span<uint8_t> data);
+ using ListFuture = opendal::ffi::async::RustFutureList;
+ ListFuture list(std::string_view path);
+
+ using ExistsFuture = opendal::ffi::async::RustFutureBool;
+ ExistsFuture exists(std::string_view path);
+
+ using CreateDirFuture = opendal::ffi::async::RustFutureWrite;
+ CreateDirFuture create_dir(std::string_view path);
+
+ using CopyFuture = opendal::ffi::async::RustFutureWrite;
+ CopyFuture copy(std::string_view from, std::string_view to);
+
+ using RenameFuture = opendal::ffi::async::RustFutureWrite;
+ RenameFuture rename(std::string_view from, std::string_view to);
+
+ using DeleteFuture = opendal::ffi::async::RustFutureWrite;
+ DeleteFuture delete_path(std::string_view path);
+
+ using RemoveAllFuture = opendal::ffi::async::RustFutureWrite;
+ RemoveAllFuture remove_all(std::string_view path);
+
+ using ReaderFuture = opendal::ffi::async::RustFutureReaderId;
+ ReaderFuture reader(std::string_view path);
+
+ using ListerFuture = opendal::ffi::async::RustFutureListerId;
+ ListerFuture lister(std::string_view path);
+
private:
rust::Box<opendal::ffi::async::Operator> operator_;
};
+/**
+ * @class Reader
+ * @brief Async Reader is designed to read data from a specific path in an
asynchronous manner.
+ * @details It provides streaming read operations with range support.
+ */
+class Reader {
+ public:
+ // Disable copy and assign
+ Reader(const Reader &) = delete;
+ Reader &operator=(const Reader &) = delete;
+
+ // Enable move
+ Reader(Reader &&other) noexcept;
+ Reader &operator=(Reader &&other) noexcept;
+ ~Reader() noexcept;
+
+ // Constructor from ID (for tests and advanced usage)
+ explicit Reader(size_t reader_id) noexcept;
+
+ using ReadFuture = opendal::ffi::async::RustFutureRead;
+
+ /**
+ * @brief Read data from the specified range
+ * @param start Start offset in bytes
+ * @param len Number of bytes to read
+ * @return Future that resolves to the read data
+ */
+ ReadFuture read(uint64_t start, uint64_t len);
+
+ private:
+ friend class Operator;
+
+ void destroy() noexcept;
+
+ size_t reader_id_{0};
+};
+
+/**
+ * @class Lister
+ * @brief Async Lister is designed to list entries at a specified path in an
asynchronous manner.
+ * @details It provides streaming iteration over directory entries.
+ */
+class Lister {
+ public:
+ // Disable copy and assign
+ Lister(const Lister &) = delete;
+ Lister &operator=(const Lister &) = delete;
+
+ // Enable move
+ Lister(Lister &&other) noexcept;
+ Lister &operator=(Lister &&other) noexcept;
+ ~Lister() noexcept;
+
+ // Constructor from ID (for tests and advanced usage)
+ explicit Lister(size_t lister_id) noexcept;
+
+ using NextFuture = opendal::ffi::async::RustFutureEntryOption;
+
+ /**
+ * @brief Get the next entry in the listing
+ * @return Future that resolves to the next entry path, or empty string if
no more entries
+ */
+ NextFuture next();
+
+ private:
+ friend class Operator;
+
+ void destroy() noexcept;
+
+ size_t lister_id_{0};
+};
+
} // namespace opendal::async
diff --git a/bindings/cpp/src/async.rs b/bindings/cpp/src/async.rs
index b4f94899e..323a2b69b 100644
--- a/bindings/cpp/src/async.rs
+++ b/bindings/cpp/src/async.rs
@@ -22,6 +22,8 @@ use std::collections::HashMap;
use std::future::Future;
use std::ops::Deref;
use std::str::FromStr;
+use std::sync::{Arc, OnceLock};
+use tokio::sync::Mutex;
#[cxx::bridge(namespace = "opendal::ffi::async")]
mod ffi {
@@ -37,17 +39,47 @@ mod ffi {
op: *const Operator,
}
+ pub struct ReaderPtr {
+ id: usize,
+ }
+
+ pub struct ListerPtr {
+ id: usize,
+ }
+
extern "Rust" {
type Operator;
+ type Reader;
+ type Lister;
fn new_operator(scheme: &str, configs: Vec<HashMapValue>) ->
Result<Box<Operator>>;
unsafe fn operator_read(op: OperatorPtr, path: String) ->
RustFutureRead;
unsafe fn operator_write(op: OperatorPtr, path: String, bs: Vec<u8>)
-> RustFutureWrite;
+ unsafe fn operator_list(op: OperatorPtr, path: String) ->
RustFutureList;
+ unsafe fn operator_exists(op: OperatorPtr, path: String) ->
RustFutureBool;
+ unsafe fn operator_create_dir(op: OperatorPtr, path: String) ->
RustFutureWrite;
+ unsafe fn operator_copy(op: OperatorPtr, from: String, to: String) ->
RustFutureWrite;
+ unsafe fn operator_rename(op: OperatorPtr, from: String, to: String)
-> RustFutureWrite;
+ unsafe fn operator_delete(op: OperatorPtr, path: String) ->
RustFutureWrite;
+ unsafe fn operator_remove_all(op: OperatorPtr, path: String) ->
RustFutureWrite;
+ unsafe fn operator_reader(op: OperatorPtr, path: String) ->
RustFutureReaderId;
+ unsafe fn operator_lister(op: OperatorPtr, path: String) ->
RustFutureListerId;
+
+ unsafe fn reader_read(reader: ReaderPtr, start: u64, len: u64) ->
RustFutureRead;
+ unsafe fn lister_next(lister: ListerPtr) -> RustFutureEntryOption;
+
+ fn delete_reader(reader: ReaderPtr);
+ fn delete_lister(lister: ListerPtr);
}
extern "C++" {
type RustFutureRead = super::RustFutureRead;
type RustFutureWrite = super::RustFutureWrite;
+ type RustFutureList = super::RustFutureList;
+ type RustFutureBool = super::RustFutureBool;
+ type RustFutureReaderId = super::RustFutureReaderId;
+ type RustFutureListerId = super::RustFutureListerId;
+ type RustFutureEntryOption = super::RustFutureEntryOption;
}
}
@@ -61,7 +93,62 @@ unsafe impl Future for RustFutureWrite {
type Output = ();
}
+#[cxx_async::bridge(namespace = opendal::ffi::async)]
+unsafe impl Future for RustFutureList {
+ type Output = Vec<String>;
+}
+
+#[cxx_async::bridge(namespace = opendal::ffi::async)]
+unsafe impl Future for RustFutureBool {
+ type Output = bool;
+}
+
+#[cxx_async::bridge(namespace = opendal::ffi::async)]
+unsafe impl Future for RustFutureReaderId {
+ type Output = usize;
+}
+
+#[cxx_async::bridge(namespace = opendal::ffi::async)]
+unsafe impl Future for RustFutureListerId {
+ type Output = usize;
+}
+
+#[cxx_async::bridge(namespace = opendal::ffi::async)]
+unsafe impl Future for RustFutureEntryOption {
+ type Output = String;
+}
+
pub struct Operator(od::Operator);
+pub struct Reader {
+ reader: Arc<od::Reader>,
+ id: usize,
+}
+pub struct Lister {
+ lister: Arc<Mutex<od::Lister>>,
+ id: usize,
+}
+
+// Global storage for readers and listers to avoid Send issues with raw
pointers
+static READER_STORAGE: OnceLock<Mutex<HashMap<usize, Arc<od::Reader>>>> =
OnceLock::new();
+static READER_COUNTER: OnceLock<Mutex<usize>> = OnceLock::new();
+static LISTER_STORAGE: OnceLock<Mutex<HashMap<usize, Arc<Mutex<od::Lister>>>>>
= OnceLock::new();
+static LISTER_COUNTER: OnceLock<Mutex<usize>> = OnceLock::new();
+
+fn get_reader_storage() -> &'static Mutex<HashMap<usize, Arc<od::Reader>>> {
+ READER_STORAGE.get_or_init(|| Mutex::new(HashMap::new()))
+}
+
+fn get_reader_counter() -> &'static Mutex<usize> {
+ READER_COUNTER.get_or_init(|| Mutex::new(0))
+}
+
+fn get_lister_storage() -> &'static Mutex<HashMap<usize,
Arc<Mutex<od::Lister>>>> {
+ LISTER_STORAGE.get_or_init(|| Mutex::new(HashMap::new()))
+}
+
+fn get_lister_counter() -> &'static Mutex<usize> {
+ LISTER_COUNTER.get_or_init(|| Mutex::new(0))
+}
fn new_operator(scheme: &str, configs: Vec<ffi::HashMapValue>) ->
Result<Box<Operator>> {
let scheme = od::Scheme::from_str(scheme)?;
@@ -85,6 +172,8 @@ impl Deref for ffi::OperatorPtr {
}
unsafe impl Send for ffi::OperatorPtr {}
+unsafe impl Send for ffi::ReaderPtr {}
+unsafe impl Send for ffi::ListerPtr {}
unsafe fn operator_read(op: ffi::OperatorPtr, path: String) -> RustFutureRead {
RustFutureRead::fallible(async move {
@@ -105,3 +194,156 @@ unsafe fn operator_write(op: ffi::OperatorPtr, path:
String, bs: Vec<u8>) -> Rus
.map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))
})
}
+
+unsafe fn operator_list(op: ffi::OperatorPtr, path: String) -> RustFutureList {
+ RustFutureList::fallible(async move {
+ let entries =
+ op.0.list(&path)
+ .await
+ .map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))?;
+ Ok(entries.into_iter().map(|e| e.path().to_string()).collect())
+ })
+}
+
+unsafe fn operator_exists(op: ffi::OperatorPtr, path: String) ->
RustFutureBool {
+ RustFutureBool::fallible(async move {
+ op.0.exists(&path)
+ .await
+ .map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))
+ })
+}
+
+unsafe fn operator_create_dir(op: ffi::OperatorPtr, path: String) ->
RustFutureWrite {
+ RustFutureWrite::fallible(async move {
+ op.0.create_dir(&path)
+ .await
+ .map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))
+ })
+}
+
+unsafe fn operator_copy(op: ffi::OperatorPtr, from: String, to: String) ->
RustFutureWrite {
+ RustFutureWrite::fallible(async move {
+ op.0.copy(&from, &to)
+ .await
+ .map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))
+ })
+}
+
+unsafe fn operator_rename(op: ffi::OperatorPtr, from: String, to: String) ->
RustFutureWrite {
+ RustFutureWrite::fallible(async move {
+ op.0.rename(&from, &to)
+ .await
+ .map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))
+ })
+}
+
+unsafe fn operator_delete(op: ffi::OperatorPtr, path: String) ->
RustFutureWrite {
+ RustFutureWrite::fallible(async move {
+ op.0.delete(&path)
+ .await
+ .map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))
+ })
+}
+
+unsafe fn operator_remove_all(op: ffi::OperatorPtr, path: String) ->
RustFutureWrite {
+ RustFutureWrite::fallible(async move {
+ op.0.remove_all(&path)
+ .await
+ .map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))
+ })
+}
+
+unsafe fn operator_reader(op: ffi::OperatorPtr, path: String) ->
RustFutureReaderId {
+ RustFutureReaderId::fallible(async move {
+ let reader =
+ op.0.reader(&path)
+ .await
+ .map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))?;
+
+ // Store the reader in global storage and return an ID
+ let reader_arc = Arc::new(reader);
+ let mut counter = get_reader_counter().lock().await;
+ *counter += 1;
+ let id = *counter;
+
+ let mut storage = get_reader_storage().lock().await;
+ storage.insert(id, reader_arc);
+
+ Ok(id)
+ })
+}
+
+unsafe fn operator_lister(op: ffi::OperatorPtr, path: String) ->
RustFutureListerId {
+ RustFutureListerId::fallible(async move {
+ let lister =
+ op.0.lister(&path)
+ .await
+ .map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))?;
+
+ // Store the lister in global storage and return an ID
+ let lister_arc = Arc::new(Mutex::new(lister));
+ let mut counter = get_lister_counter().lock().await;
+ *counter += 1;
+ let id = *counter;
+
+ let mut storage = get_lister_storage().lock().await;
+ storage.insert(id, lister_arc);
+
+ Ok(id)
+ })
+}
+
+unsafe fn reader_read(reader: ffi::ReaderPtr, start: u64, len: u64) ->
RustFutureRead {
+ RustFutureRead::fallible(async move {
+ let storage = get_reader_storage().lock().await;
+ let reader_arc = storage
+ .get(&reader.id)
+ .ok_or_else(|| CxxAsyncException::new("Invalid reader ID".into()))?
+ .clone();
+ drop(storage);
+
+ let buffer = reader_arc
+ .read(start..(start + len))
+ .await
+ .map_err(|e|
CxxAsyncException::new(e.to_string().into_boxed_str()))?;
+ Ok(buffer.to_vec())
+ })
+}
+
+unsafe fn lister_next(lister: ffi::ListerPtr) -> RustFutureEntryOption {
+ RustFutureEntryOption::fallible(async move {
+ use futures::TryStreamExt;
+
+ let storage = get_lister_storage().lock().await;
+ let lister_arc = storage
+ .get(&lister.id)
+ .ok_or_else(|| CxxAsyncException::new("Invalid lister ID".into()))?
+ .clone();
+ drop(storage);
+
+ let mut lister_guard = lister_arc.lock().await;
+ match lister_guard.try_next().await {
+ Ok(Some(entry)) => Ok(entry.path().to_string()),
+ Ok(None) => Ok(String::new()), // Empty string indicates end of
iteration
+ Err(e) =>
Err(CxxAsyncException::new(e.to_string().into_boxed_str())),
+ }
+ })
+}
+
+fn delete_reader(reader: ffi::ReaderPtr) {
+ // Use blocking lock since this is called from C++ destructors
+ if let Ok(mut storage) = get_reader_storage().try_lock() {
+ storage.remove(&reader.id);
+ }
+ // If we can't get the lock immediately, we'll just skip cleanup
+ // This is better than panicking in a destructor
+}
+
+fn delete_lister(lister: ffi::ListerPtr) {
+ // Use blocking lock since this is called from C++ destructors
+ if let Ok(mut storage) = get_lister_storage().try_lock() {
+ storage.remove(&lister.id);
+ }
+ // If we can't get the lock immediately, we'll just skip cleanup
+ // This is better than panicking in a destructor
+}
diff --git a/bindings/cpp/src/opendal_async.cpp
b/bindings/cpp/src/opendal_async.cpp
index 6ec2dccad..3999a8d0e 100644
--- a/bindings/cpp/src/opendal_async.cpp
+++ b/bindings/cpp/src/opendal_async.cpp
@@ -58,3 +58,110 @@ Operator::WriteFuture Operator::write(std::string_view path,
return opendal::ffi::async::operator_write(
opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path), vec);
}
+
+Operator::ListFuture Operator::list(std::string_view path) {
+ return opendal::ffi::async::operator_list(
+ opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::ExistsFuture Operator::exists(std::string_view path) {
+ return opendal::ffi::async::operator_exists(
+ opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::CreateDirFuture Operator::create_dir(std::string_view path) {
+ return opendal::ffi::async::operator_create_dir(
+ opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::CopyFuture Operator::copy(std::string_view from, std::string_view
to) {
+ return opendal::ffi::async::operator_copy(
+ opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(from),
+ RUST_STRING(to));
+}
+
+Operator::RenameFuture Operator::rename(std::string_view from,
std::string_view to) {
+ return opendal::ffi::async::operator_rename(
+ opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(from),
+ RUST_STRING(to));
+}
+
+Operator::DeleteFuture Operator::delete_path(std::string_view path) {
+ return opendal::ffi::async::operator_delete(
+ opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::RemoveAllFuture Operator::remove_all(std::string_view path) {
+ return opendal::ffi::async::operator_remove_all(
+ opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::ReaderFuture Operator::reader(std::string_view path) {
+ return opendal::ffi::async::operator_reader(
+ opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+Operator::ListerFuture Operator::lister(std::string_view path) {
+ return opendal::ffi::async::operator_lister(
+ opendal::ffi::async::OperatorPtr{&*operator_}, RUST_STRING(path));
+}
+
+// Reader implementation
+Reader::Reader(size_t reader_id) noexcept : reader_id_(reader_id) {}
+
+Reader::Reader(Reader &&other) noexcept : reader_id_(other.reader_id_) {
+ other.reader_id_ = 0;
+}
+
+Reader &Reader::operator=(Reader &&other) noexcept {
+ if (this != &other) {
+ reader_id_ = other.reader_id_;
+ other.reader_id_ = 0;
+ }
+ return *this;
+}
+
+Reader::~Reader() noexcept { destroy(); }
+
+void Reader::destroy() noexcept {
+ if (reader_id_ != 0) {
+ opendal::ffi::async::delete_reader(
+ opendal::ffi::async::ReaderPtr{reader_id_});
+ reader_id_ = 0;
+ }
+}
+
+Reader::ReadFuture Reader::read(uint64_t start, uint64_t len) {
+ return opendal::ffi::async::reader_read(
+ opendal::ffi::async::ReaderPtr{reader_id_}, start, len);
+}
+
+// Lister implementation
+Lister::Lister(size_t lister_id) noexcept : lister_id_(lister_id) {}
+
+Lister::Lister(Lister &&other) noexcept : lister_id_(other.lister_id_) {
+ other.lister_id_ = 0;
+}
+
+Lister &Lister::operator=(Lister &&other) noexcept {
+ if (this != &other) {
+ lister_id_ = other.lister_id_;
+ other.lister_id_ = 0;
+ }
+ return *this;
+}
+
+Lister::~Lister() noexcept { destroy(); }
+
+void Lister::destroy() noexcept {
+ if (lister_id_ != 0) {
+ opendal::ffi::async::delete_lister(
+ opendal::ffi::async::ListerPtr{lister_id_});
+ lister_id_ = 0;
+ }
+}
+
+Lister::NextFuture Lister::next() {
+ return opendal::ffi::async::lister_next(
+ opendal::ffi::async::ListerPtr{lister_id_});
+}
diff --git a/bindings/cpp/tests/async_test.cpp
b/bindings/cpp/tests/async_test.cpp
index dc25c2068..8e7ee7ca6 100644
--- a/bindings/cpp/tests/async_test.cpp
+++ b/bindings/cpp/tests/async_test.cpp
@@ -57,3 +57,318 @@ TEST_F(AsyncOpendalTest, BasicTest) {
co_return;
}());
}
+
+TEST_F(AsyncOpendalTest, AsyncOperationsTest) {
+ cppcoro::sync_wait([&]() -> cppcoro::task<void> {
+ const auto dir_path = "test_async_dir/";
+ const auto file_path = "test_async_dir/test_file.txt";
+ std::vector<uint8_t> file_content = {1, 2, 3, 4, 5};
+
+ // Create directory and check existence
+ co_await op->create_dir(dir_path);
+ auto dir_exists = co_await op->exists(dir_path);
+ EXPECT_TRUE(dir_exists);
+
+ // Check non-existent file
+ auto non_existent_exists = co_await op->exists("non_existent_file");
+ EXPECT_FALSE(non_existent_exists);
+
+ // Write a file and check its existence
+ co_await op->write(file_path, file_content);
+ auto file_exists = co_await op->exists(file_path);
+ EXPECT_TRUE(file_exists);
+
+ const auto copied_file_path = "test_async_dir/copied_file.txt";
+ co_await op->write(copied_file_path, file_content);
+ auto copied_file_exists = co_await op->exists(copied_file_path);
+ EXPECT_TRUE(copied_file_exists);
+ auto copied_content_rust_vec = co_await op->read(copied_file_path);
+ EXPECT_EQ(file_content.size(), copied_content_rust_vec.size());
+ for (size_t i = 0; i < file_content.size(); ++i) {
+ EXPECT_EQ(file_content[i], copied_content_rust_vec[i]);
+ }
+
+ const auto renamed_file_path = "test_async_dir/renamed_file.txt";
+ co_await op->write(renamed_file_path, file_content);
+ co_await op->delete_path(copied_file_path);
+ auto old_copied_exists = co_await op->exists(copied_file_path);
+ EXPECT_FALSE(old_copied_exists);
+ auto renamed_file_exists = co_await op->exists(renamed_file_path);
+ EXPECT_TRUE(renamed_file_exists);
+ auto renamed_content_rust_vec = co_await op->read(renamed_file_path);
+ EXPECT_EQ(file_content.size(), renamed_content_rust_vec.size());
+ for (size_t i = 0; i < file_content.size(); ++i) {
+ EXPECT_EQ(file_content[i], renamed_content_rust_vec[i]);
+ }
+
+ // Delete the renamed file and check non-existence
+ co_await op->delete_path(renamed_file_path);
+ auto deleted_exists = co_await op->exists(renamed_file_path);
+ EXPECT_FALSE(deleted_exists);
+
+ // Create another file in the directory
+ const auto another_file_path = "test_async_dir/another_file.txt";
+ co_await op->write(another_file_path, file_content);
+ auto another_file_exists = co_await op->exists(another_file_path);
+ EXPECT_TRUE(another_file_exists);
+
+ // Remove the entire directory and check non-existence of the directory
and its contents
+ co_await op->remove_all(dir_path);
+ auto dir_after_remove_exists = co_await op->exists(dir_path);
+ EXPECT_FALSE(dir_after_remove_exists);
+ auto file_in_removed_dir_exists = co_await op->exists(file_path); //
Original file path
+ EXPECT_FALSE(file_in_removed_dir_exists);
+ auto another_file_in_removed_dir_exists = co_await
op->exists(another_file_path);
+ EXPECT_FALSE(another_file_in_removed_dir_exists);
+
+ // Test listing after cleaning up
+ const auto list_dir_path = "test_list_dir/";
+ const auto list_file1_path = "test_list_dir/file1.txt";
+ const auto list_file2_path = "test_list_dir/file2.txt";
+ const auto list_subdir_path = "test_list_dir/subdir/";
+
+ co_await op->create_dir(list_dir_path);
+ co_await op->write(list_file1_path, file_content);
+ co_await op->write(list_file2_path, file_content);
+ co_await op->create_dir(list_subdir_path);
+
+ auto listed_entries = co_await op->list(list_dir_path);
+ EXPECT_EQ(listed_entries.size(), 4);
+
+ bool found_file1 = false;
+ bool found_file2 = false;
+ bool found_subdir = false;
+ for (const auto& entry : listed_entries) {
+ if (std::string(entry).ends_with("file1.txt")) found_file1 = true;
+ if (std::string(entry).ends_with("file2.txt")) found_file2 = true;
+ if (std::string(entry).ends_with("subdir/")) found_subdir = true;
+ }
+ EXPECT_TRUE(found_file1);
+ EXPECT_TRUE(found_file2);
+ EXPECT_TRUE(found_subdir);
+
+ // Clean up list test directory
+ co_await op->remove_all(list_dir_path);
+ co_return;
+ }());
+}
+
+TEST_F(AsyncOpendalTest, AsyncReaderTest) {
+ cppcoro::sync_wait([&]() -> cppcoro::task<void> {
+ const auto file_path = "test_async_reader.txt";
+
+ // Create test data - larger to test range reads
+ std::vector<uint8_t> test_data;
+ for (int i = 0; i < 1000; ++i) {
+ test_data.push_back(static_cast<uint8_t>(i % 256));
+ }
+
+ // Write the test file
+ co_await op->write(file_path, test_data);
+
+ // Test: Create async reader
+ auto reader_id = co_await op->reader(file_path);
+ opendal::async::Reader reader(reader_id);
+
+ // Test: Read full file in one go
+ auto full_data = co_await reader.read(0, test_data.size());
+ EXPECT_EQ(full_data.size(), test_data.size());
+ for (size_t i = 0; i < test_data.size(); ++i) {
+ EXPECT_EQ(full_data[i], test_data[i]);
+ }
+
+ // Test: Read in chunks
+ const size_t chunk_size = 100;
+ std::vector<uint8_t> chunked_data;
+
+ for (size_t offset = 0; offset < test_data.size(); offset += chunk_size) {
+ size_t read_size = std::min(chunk_size, test_data.size() - offset);
+ auto chunk = co_await reader.read(offset, read_size);
+
+ EXPECT_EQ(chunk.size(), read_size);
+ chunked_data.insert(chunked_data.end(), chunk.begin(), chunk.end());
+ }
+
+ // Verify chunked read matches original data
+ EXPECT_EQ(chunked_data.size(), test_data.size());
+ for (size_t i = 0; i < test_data.size(); ++i) {
+ EXPECT_EQ(chunked_data[i], test_data[i]);
+ }
+
+ // Test: Read specific range
+ auto range_data = co_await reader.read(100, 50);
+ EXPECT_EQ(range_data.size(), 50);
+ for (size_t i = 0; i < 50; ++i) {
+ EXPECT_EQ(range_data[i], test_data[100 + i]);
+ }
+
+ // Test: Read from end of file
+ auto end_data = co_await reader.read(test_data.size() - 10, 10);
+ EXPECT_EQ(end_data.size(), 10);
+ for (size_t i = 0; i < 10; ++i) {
+ EXPECT_EQ(end_data[i], test_data[test_data.size() - 10 + i]);
+ }
+
+ // Test: Move semantics
+ auto reader_id2 = co_await op->reader(file_path);
+ opendal::async::Reader reader2(reader_id2);
+ opendal::async::Reader moved_reader = std::move(reader2);
+
+ auto moved_data = co_await moved_reader.read(0, 100);
+ EXPECT_EQ(moved_data.size(), 100);
+ for (size_t i = 0; i < 100; ++i) {
+ EXPECT_EQ(moved_data[i], test_data[i]);
+ }
+
+ // Clean up
+ co_await op->delete_path(file_path);
+ co_return;
+ }());
+}
+
+TEST_F(AsyncOpendalTest, AsyncListerTest) {
+ cppcoro::sync_wait([&]() -> cppcoro::task<void> {
+ const auto base_dir = "test_async_lister/";
+ const auto file1_path = "test_async_lister/file1.txt";
+ const auto file2_path = "test_async_lister/file2.txt";
+ const auto file3_path = "test_async_lister/file3.txt";
+ const auto subdir_path = "test_async_lister/subdir/";
+ const auto subdir_file_path = "test_async_lister/subdir/nested_file.txt";
+
+ auto test_data = std::vector<uint8_t>{1, 2, 3, 4, 5};
+
+ // Set up test directory structure
+ co_await op->create_dir(base_dir);
+ co_await op->write(file1_path, test_data);
+ co_await op->write(file2_path, test_data);
+ co_await op->write(file3_path, test_data);
+ co_await op->create_dir(subdir_path);
+ co_await op->write(subdir_file_path, test_data);
+
+ // Test: Create async lister
+ auto lister_id = co_await op->lister(base_dir);
+ auto lister = opendal::async::Lister(lister_id);
+
+ // Test: Iterate through all entries
+ auto found_entries = std::vector<std::string>{};
+ while (true) {
+ auto entry_path = co_await lister.next();
+ if (entry_path.empty()) {
+ break; // End of iteration
+ }
+ found_entries.push_back(std::string(entry_path));
+ }
+
+ // Verify we found the expected entries
+ EXPECT_EQ(found_entries.size(), 5); // At least 4 entries (3 files + 1
subdir)
+
+ bool found_file1 = false;
+ bool found_file2 = false;
+ bool found_file3 = false;
+ bool found_subdir = false;
+ for (const auto& entry : found_entries) {
+ if (std::string(entry).ends_with("file1.txt")) found_file1 = true;
+ if (std::string(entry).ends_with("file2.txt")) found_file2 = true;
+ if (std::string(entry).ends_with("file3.txt")) found_file3 = true;
+ if (std::string(entry).ends_with("subdir/")) found_subdir = true;
+ }
+
+ EXPECT_TRUE(found_file1);
+ EXPECT_TRUE(found_file2);
+ EXPECT_TRUE(found_file3);
+ EXPECT_TRUE(found_subdir);
+
+ // Test: Empty directory listing
+ const auto empty_dir = "test_async_lister_empty/";
+ co_await op->create_dir(empty_dir);
+
+ auto empty_lister_id = co_await op->lister(empty_dir);
+ auto empty_lister = opendal::async::Lister(empty_lister_id);
+
+ auto first_entry = co_await empty_lister.next();
+ // Test: Move semantics
+ auto lister_id2 = co_await op->lister(base_dir);
+ auto lister2 = opendal::async::Lister(lister_id2);
+ opendal::async::Lister moved_lister = std::move(lister2);
+
+ // Should be able to use moved lister
+ auto moved_entry = co_await moved_lister.next();
+ EXPECT_FALSE(moved_entry.empty()); // Should find at least one entry
+
+ // Clean up
+ co_await op->remove_all(base_dir);
+ co_await op->remove_all(empty_dir);
+ co_return;
+ }());
+}
+
+TEST_F(AsyncOpendalTest, AsyncListerErrorHandlingTest) {
+ cppcoro::sync_wait([&]() -> cppcoro::task<void> {
+ // Test: Listing non-existent directory
+ auto lister_id = co_await op->lister("non_existent_directory/");
+ auto lister = opendal::async::Lister(lister_id);
+ auto entry = co_await lister.next();
+ EXPECT_TRUE(entry.empty());
+
+ co_return;
+ }());
+}
+
+TEST_F(AsyncOpendalTest, AsyncReaderListerIntegrationTest) {
+ cppcoro::sync_wait([&]() -> cppcoro::task<void> {
+ const auto base_dir = "test_integration/";
+ const auto large_file = "test_integration/large_file.bin";
+
+ // Create a larger file for more realistic testing
+ std::vector<uint8_t> large_data;
+ large_data.reserve(10000);
+ for (int i = 0; i < 10000; ++i) {
+ large_data.push_back(static_cast<uint8_t>(i % 256));
+ }
+
+ // Set up test environment
+ co_await op->create_dir(base_dir);
+ co_await op->write(large_file, large_data);
+
+ // Test: Use lister to find the file, then reader to read it
+ auto lister_id = co_await op->lister(base_dir);
+ auto lister = opendal::async::Lister(lister_id);
+
+ std::string found_file;
+ while (true) {
+ auto entry = co_await lister.next();
+ if (entry.empty()) break;
+
+ if (std::string(entry).ends_with("large_file.bin")) {
+ found_file = std::string(entry);
+ break;
+ }
+ }
+
+ EXPECT_FALSE(found_file.empty());
+
+ // Now read the found file using async reader
+ auto reader_id = co_await op->reader(large_file);
+ auto reader = opendal::async::Reader(reader_id);
+
+ // Read in multiple chunks to test streaming
+ const size_t chunk_size = 1000;
+ auto reconstructed_data = std::vector<uint8_t>{};
+
+ for (size_t offset = 0; offset < large_data.size(); offset += chunk_size) {
+ size_t read_size = std::min(chunk_size, large_data.size() - offset);
+ auto chunk = co_await reader.read(offset, read_size);
+ reconstructed_data.insert(reconstructed_data.end(), chunk.begin(),
chunk.end());
+ }
+
+ // Verify the data integrity
+ EXPECT_EQ(reconstructed_data.size(), large_data.size());
+ for (size_t i = 0; i < large_data.size(); ++i) {
+ EXPECT_EQ(reconstructed_data[i], large_data[i]);
+ }
+
+ // Clean up
+ co_await op->remove_all(base_dir);
+ co_return;
+ }());
+}