pitrou commented on a change in pull request #11890:
URL: https://github.com/apache/arrow/pull/11890#discussion_r766001410



##########
File path: cpp/src/arrow/filesystem/gcsfs_test.cc
##########
@@ -414,6 +414,34 @@ TEST_F(GcsIntegrationTest, 
CreateDirRecursiveBucketAndFolder) {
   arrow::fs::AssertFileInfo(fs.get(), "new-bucket/", FileType::Directory);
 }
 
+TEST_F(GcsIntegrationTest, DeleteDirSuccess) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+  const char* const kTestFolders[] = {
+      "a/", "a/0/", "a/0/0/", "a/1/", "a/2/",
+  };
+  for (auto const* f : kTestFolders) {
+    const auto folder = PreexistingBucketPath() + f;
+    ASSERT_OK(fs->CreateDir(folder, true));
+    for (int i = 0; i != 64; ++i) {
+      const auto filename = folder + "test-file-" + std::to_string(i);
+      ASSERT_OK_AND_ASSIGN(auto w, fs->OpenOutputStream(filename, {}));
+      ASSERT_OK(w->Write(filename.data(), filename.size()));
+      ASSERT_OK(w->Close());
+    }
+  }
+
+  ASSERT_OK(fs->DeleteDir(PreexistingBucketPath() + kTestFolders[0]));
+
+  for (auto const* f : kTestFolders) {
+    const auto folder = PreexistingBucketPath() + f;
+    arrow::fs::AssertFileInfo(fs.get(), folder, FileType::NotFound);
+    for (int i = 0; i != 64; ++i) {
+      const auto filename = folder + "test-file-" + std::to_string(i);
+      arrow::fs::AssertFileInfo(fs.get(), filename, FileType::NotFound);
+    }
+  }
+}

Review comment:
       Should you test that other paths in the bucket still exist?

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -342,6 +344,53 @@ class GcsFileSystem::Impl {
     return internal::ToArrowStatus(CreateDirMarkerRecursive(p.bucket, 
p.object));
   }
 
+  Status DeleteDir(const GcsPath& p, const io::IOContext& io_context) {
+    RETURN_NOT_OK(DeleteDirContents(p, io_context));
+    if (!p.object.empty()) {
+      return internal::ToArrowStatus(client_.DeleteObject(p.bucket, p.object));
+    }
+    return internal::ToArrowStatus(client_.DeleteBucket(p.bucket));
+  }
+
+  Status DeleteDirContents(const GcsPath& p, const io::IOContext& io_context) {
+    // Deleting large directories can be fairly slow, we need to parallelize 
the
+    // operation. This uses `std::async()` to run multiple delete operations 
in parallel.
+    // A simple form of flow control limits the number of operations running in
+    // parallel.
+
+    auto async_delete =
+        [&p](gcs::Client& client,
+             google::cloud::StatusOr<gcs::ObjectMetadata> o) -> 
google::cloud::Status {
+      if (!o) return std::move(o).status();
+      // The list includes the directory, skip it. DeleteDir() takes care of 
it.
+      if (o->bucket() == p.bucket && o->name() == p.object) return {};
+      return client.DeleteObject(o->bucket(), o->name(),
+                                 gcs::Generation(o->generation()));
+    };
+
+    using Future = arrow::Future<google::cloud::Status>;
+    std::vector<Result<Future>> submitted;
+    // This iterates over all the objects, and schedules parallel deletes.
+    auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(p.object);
+    for (auto& o : client_.ListObjects(p.bucket, prefix)) {
+      submitted.push_back(

Review comment:
       By using `DeferNotOk`, you could instead append to a 
`std::vector<Future>`, which will simplify the collection code below.
   
   For example (untested):
   ```c++
       std::vector<Future> submitted;
       // This iterates over all the objects, and schedules parallel deletes.
       auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(p.object);
       for (auto& o : client_.ListObjects(p.bucket, prefix)) {
         submitted.push_back(DeferNotOk(
             io_context.executor()->Submit(async_delete, std::ref(client_), 
std::move(o))));
       }
   
       return AllFinished(submitted);
   ```
   

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -342,6 +344,53 @@ class GcsFileSystem::Impl {
     return internal::ToArrowStatus(CreateDirMarkerRecursive(p.bucket, 
p.object));
   }
 
+  Status DeleteDir(const GcsPath& p, const io::IOContext& io_context) {
+    RETURN_NOT_OK(DeleteDirContents(p, io_context));
+    if (!p.object.empty()) {
+      return internal::ToArrowStatus(client_.DeleteObject(p.bucket, p.object));
+    }
+    return internal::ToArrowStatus(client_.DeleteBucket(p.bucket));
+  }
+
+  Status DeleteDirContents(const GcsPath& p, const io::IOContext& io_context) {
+    // Deleting large directories can be fairly slow, we need to parallelize 
the
+    // operation. This uses `std::async()` to run multiple delete operations 
in parallel.
+    // A simple form of flow control limits the number of operations running in
+    // parallel.
+
+    auto async_delete =
+        [&p](gcs::Client& client,
+             google::cloud::StatusOr<gcs::ObjectMetadata> o) -> 
google::cloud::Status {

Review comment:
       Two suggestions:
   1) just capture the client as a closure parameter
   2) return an arrow `Status` here, such that you can use `AllFinished(const 
std::vector<Future>&)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to