pitrou commented on a change in pull request #8818:
URL: https://github.com/apache/arrow/pull/8818#discussion_r538196698
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1080,6 +1082,134 @@ void FileObjectToInfo(const S3Model::Object& obj,
FileInfo* info) {
info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
}
+struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
+ using ResultHandler = std::function<Status(const std::string& prefix,
+ const
S3Model::ListObjectsV2Result&)>;
+ using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
+ using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
+
+ Aws::S3::S3Client* client_;
+ const std::string bucket_;
+ const std::string base_dir_;
+ const int32_t max_keys_;
+ const ResultHandler result_handler_;
+ const ErrorHandler error_handler_;
+ const RecursionHandler recursion_handler_;
+
+ template <typename... Args>
+ static Status Walk(Args&&... args) {
+ auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
+ return self->DoWalk();
+ }
+
+ TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string
base_dir,
+ int32_t max_keys, ResultHandler result_handler, ErrorHandler
error_handler,
+ RecursionHandler recursion_handler)
+ : client_(std::move(client)),
+ bucket_(std::move(bucket)),
+ base_dir_(std::move(base_dir)),
+ max_keys_(max_keys),
+ result_handler_(std::move(result_handler)),
+ error_handler_(std::move(error_handler)),
+ recursion_handler_(std::move(recursion_handler)) {}
+
+ private:
+ std::mutex mutex_;
+ Future<> future_;
+ std::atomic<int32_t> num_in_flight_;
+
+ Status DoWalk() {
+ future_ = decltype(future_)::Make();
+ num_in_flight_ = 0;
+ WalkChild(base_dir_, /*nesting_depth=*/0);
+ // When this returns, ListObjectsV2 tasks either have finished or will
exit early
+ return future_.status();
+ }
+
+ bool is_finished() const { return future_.is_finished(); }
+
+ void ListObjectsFinished(Status st) {
+ const auto in_flight = --num_in_flight_;
+ if (!st.ok() || !in_flight) {
+ future_.MarkFinished(std::move(st));
+ }
+ }
+
+ struct ListObjectsV2Handler {
+ std::shared_ptr<TreeWalker> walker;
+ std::string prefix;
+ int32_t nesting_depth;
+ S3Model::ListObjectsV2Request req;
+
+ void operator()(const Aws::S3::S3Client*, const
S3Model::ListObjectsV2Request&,
+ const S3Model::ListObjectsV2Outcome& outcome,
+ const std::shared_ptr<const
Aws::Client::AsyncCallerContext>&) {
+ // Serialize calls to operation-specific handlers
+ std::unique_lock<std::mutex> guard(walker->mutex_);
+ if (walker->is_finished()) {
+ // Early exit: avoid executing handlers if DoWalk() returned
+ return;
+ }
+ if (!outcome.IsSuccess()) {
+ Status st = walker->error_handler_(outcome.GetError());
+ walker->ListObjectsFinished(std::move(st));
+ return;
+ }
+ const auto& result = outcome.GetResult();
+ bool recurse = result.GetCommonPrefixes().size() > 0;
+ if (recurse) {
+ auto maybe_recurse = walker->recursion_handler_(nesting_depth + 1);
+ if (!maybe_recurse.ok()) {
+ walker->ListObjectsFinished(maybe_recurse.status());
+ return;
+ }
+ recurse &= *maybe_recurse;
+ }
+ Status st = walker->result_handler_(prefix, result);
+ if (!st.ok()) {
+ walker->ListObjectsFinished(std::move(st));
+ return;
+ }
+ if (recurse) {
+ walker->WalkChildren(result, nesting_depth + 1);
+ }
+ if (result.GetIsTruncated()) {
+ DCHECK(!result.GetNextContinuationToken().empty());
+ req.SetContinuationToken(result.GetNextContinuationToken());
+ walker->client_->ListObjectsV2Async(req, *this);
+ } else {
+ walker->ListObjectsFinished(Status::OK());
+ }
+ }
+
+ void Start() {
+ req.SetBucket(ToAwsString(walker->bucket_));
+ if (!prefix.empty()) {
+ req.SetPrefix(ToAwsString(prefix) + kSep);
+ }
+ req.SetDelimiter(Aws::String() + kSep);
+ req.SetMaxKeys(walker->max_keys_);
+ walker->client_->ListObjectsV2Async(req, *this);
+ }
+ };
+
+ void WalkChild(std::string key, int32_t nesting_depth) {
+ ListObjectsV2Handler handler{shared_from_this(), std::move(key),
nesting_depth, {}};
+ ++num_in_flight_;
+ handler.Start();
Review comment:
`ListObjectsV2Handler::operator()` gets the request as a const-ref,
though. On the contrary, `WalkChild` doesn't have any need for the request
object except to pass it to the handler.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]