coryan commented on a change in pull request #11996:
URL: https://github.com/apache/arrow/pull/11996#discussion_r780442988



##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -505,20 +555,23 @@ class GcsFileSystem::Impl {
   }
 
  private:
-  static Result<FileInfo> GetFileInfoDirectory(const GcsPath& path,
-                                               const google::cloud::Status& 
status) {
-    using ::google::cloud::StatusCode;
-    auto canonical = internal::EnsureTrailingSlash(path.full_path);
+  static bool IsDirectory(gcs::ObjectMetadata const& o) {

Review comment:
       Fixed

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -310,93 +318,107 @@ class GcsFileSystem::Impl {
   Result<FileInfo> GetFileInfo(const GcsPath& path) {
     if (path.object.empty()) {
       auto meta = client_.GetBucketMetadata(path.bucket);
-      return GetFileInfoDirectory(path, std::move(meta).status());
+      return GetFileInfoBucket(path, std::move(meta).status());
     }
     auto meta = client_.GetObjectMetadata(path.bucket, path.object);
-    if (path.object.back() == '/') {
-      return GetFileInfoDirectory(path, std::move(meta).status());
-    }
-    return GetFileInfoFile(path, meta);
+    return GetFileInfoObject(path, meta);
   }
 
   Result<FileInfoVector> GetFileInfo(const FileSelector& select) {
     ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(select.base_dir));
+    // Adding the trailing '/' avoids problems with files named 'a', 'ab', 
'ac' which
+    // where GCS would return all of them if the prefix is 'a'.
     const auto canonical = internal::EnsureTrailingSlash(p.object);
     const auto max_depth = internal::Depth(canonical) + select.max_recursion;
     auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(canonical);
     auto delimiter = select.recursive ? gcs::Delimiter() : gcs::Delimiter("/");
-    bool found_directory = false;
     FileInfoVector result;
     for (auto const& o : client_.ListObjects(p.bucket, prefix, delimiter)) {
       if (!o.ok()) {
         if (select.allow_not_found &&
             o.status().code() == google::cloud::StatusCode::kNotFound) {
-          continue;
+          return result;
         }
         return internal::ToArrowStatus(o.status());
       }
-      found_directory = true;
       // Skip the directory itself from the results, and any result that is 
"too deep"
       // into the recursion.
       if (o->name() == p.object || internal::Depth(o->name()) > max_depth) {
         continue;
       }
       auto path = internal::ConcatAbstractPath(o->bucket(), o->name());
-      if (o->name().back() == '/') {
-        result.push_back(
-            FileInfo(internal::EnsureTrailingSlash(path), 
FileType::Directory));
-        continue;
-      }
       result.push_back(ToFileInfo(path, *o));
     }
-    if (!found_directory && !select.allow_not_found) {
-      return Status::IOError("No such file or directory '", select.base_dir, 
"'");
+    // Finding any elements indicates the directory was found.
+    if (!result.empty() || select.allow_not_found) {
+      return result;
     }
-    return result;
+    // To find out if the directory exists we need to perform an additional 
query.
+    ARROW_ASSIGN_OR_RAISE(auto directory, GetFileInfo(p));
+    if (directory.IsDirectory()) return result;
+    return Status::IOError("No such file or directory '", select.base_dir, 
"'");
   }
 
   // GCS does not have directories or folders. But folders can be emulated 
(with some
   // limitations) using marker objects.  That and listing with prefixes 
creates the
   // illusion of folders.
-  google::cloud::Status CreateDirMarker(const std::string& bucket,
-                                        util::string_view name) {
+  google::cloud::StatusOr<gcs::ObjectMetadata> CreateDirMarker(const 
std::string& bucket,
+                                                               
util::string_view name) {
     // Make the name canonical.
-    const auto canonical = internal::EnsureTrailingSlash(name);
-    return client_
-        .InsertObject(bucket, canonical, std::string(),
-                      
gcs::WithObjectMetadata(gcs::ObjectMetadata().upsert_metadata(
-                          "arrow/gcsfs", "directory")))
-        .status();
+    const auto canonical = internal::RemoveTrailingSlash(name).to_string();
+    auto object = client_.InsertObject(

Review comment:
       Fixed.

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -310,93 +318,107 @@ class GcsFileSystem::Impl {
   Result<FileInfo> GetFileInfo(const GcsPath& path) {
     if (path.object.empty()) {
       auto meta = client_.GetBucketMetadata(path.bucket);
-      return GetFileInfoDirectory(path, std::move(meta).status());
+      return GetFileInfoBucket(path, std::move(meta).status());
     }
     auto meta = client_.GetObjectMetadata(path.bucket, path.object);
-    if (path.object.back() == '/') {
-      return GetFileInfoDirectory(path, std::move(meta).status());
-    }
-    return GetFileInfoFile(path, meta);
+    return GetFileInfoObject(path, meta);
   }
 
   Result<FileInfoVector> GetFileInfo(const FileSelector& select) {
     ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(select.base_dir));
+    // Adding the trailing '/' avoids problems with files named 'a', 'ab', 
'ac' which
+    // where GCS would return all of them if the prefix is 'a'.
     const auto canonical = internal::EnsureTrailingSlash(p.object);
     const auto max_depth = internal::Depth(canonical) + select.max_recursion;
     auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(canonical);
     auto delimiter = select.recursive ? gcs::Delimiter() : gcs::Delimiter("/");
-    bool found_directory = false;
     FileInfoVector result;
     for (auto const& o : client_.ListObjects(p.bucket, prefix, delimiter)) {
       if (!o.ok()) {
         if (select.allow_not_found &&
             o.status().code() == google::cloud::StatusCode::kNotFound) {
-          continue;
+          return result;
         }
         return internal::ToArrowStatus(o.status());
       }
-      found_directory = true;
       // Skip the directory itself from the results, and any result that is 
"too deep"
       // into the recursion.
       if (o->name() == p.object || internal::Depth(o->name()) > max_depth) {
         continue;
       }
       auto path = internal::ConcatAbstractPath(o->bucket(), o->name());
-      if (o->name().back() == '/') {
-        result.push_back(
-            FileInfo(internal::EnsureTrailingSlash(path), 
FileType::Directory));
-        continue;
-      }
       result.push_back(ToFileInfo(path, *o));
     }
-    if (!found_directory && !select.allow_not_found) {
-      return Status::IOError("No such file or directory '", select.base_dir, 
"'");
+    // Finding any elements indicates the directory was found.
+    if (!result.empty() || select.allow_not_found) {
+      return result;
     }
-    return result;
+    // To find out if the directory exists we need to perform an additional 
query.
+    ARROW_ASSIGN_OR_RAISE(auto directory, GetFileInfo(p));
+    if (directory.IsDirectory()) return result;
+    return Status::IOError("No such file or directory '", select.base_dir, 
"'");
   }
 
   // GCS does not have directories or folders. But folders can be emulated 
(with some
   // limitations) using marker objects.  That and listing with prefixes 
creates the
   // illusion of folders.
-  google::cloud::Status CreateDirMarker(const std::string& bucket,
-                                        util::string_view name) {
+  google::cloud::StatusOr<gcs::ObjectMetadata> CreateDirMarker(const 
std::string& bucket,
+                                                               
util::string_view name) {
     // Make the name canonical.
-    const auto canonical = internal::EnsureTrailingSlash(name);
-    return client_
-        .InsertObject(bucket, canonical, std::string(),
-                      
gcs::WithObjectMetadata(gcs::ObjectMetadata().upsert_metadata(
-                          "arrow/gcsfs", "directory")))
-        .status();
+    const auto canonical = internal::RemoveTrailingSlash(name).to_string();
+    auto object = client_.InsertObject(
+        bucket, canonical, std::string(),
+        gcs::WithObjectMetadata(
+            gcs::ObjectMetadata().upsert_metadata("arrow/gcsfs", "directory")),
+        gcs::IfGenerationMatch(0));
+    if (object) return object;
+    if (object.status().code() == GcsCode::kFailedPrecondition) {
+      // The marker already exists, find out if it is a directory or a file.
+      return client_.GetObjectMetadata(bucket, canonical);
+    }
+    return object;
   }
 
-  google::cloud::Status CreateDirMarkerRecursive(const std::string& bucket,
-                                                 const std::string& object) {
-    using GcsCode = google::cloud::StatusCode;
+  static Status NotDirectoryError(const gcs::ObjectMetadata& o) {
+    return Status::IOError(
+        "Cannot create directory, it conflicts with an existing file '",
+        internal::ConcatAbstractPath(o.bucket(), o.name()), "'");
+  }
+
+  Status CreateDirMarkerRecursive(const std::string& bucket, const 
std::string& name) {
     auto get_parent = [](std::string const& path) {
       return std::move(internal::GetAbstractPathParent(path).first);
     };
-    // Maybe counterintuitively we create the markers from the most nested and 
up. Because
-    // GCS does not have directories creating `a/b/c` will succeed, even if 
`a/` or `a/b/`
-    // does not exist.  In the common case, where `a/b/` may already exist, it 
is more
-    // efficient to just create `a/b/c/` and then find out that `a/b/` was 
already there.
-    // In the case where none exists, it does not matter which order we follow.
-    auto status = CreateDirMarker(bucket, object);
-    if (status.code() == GcsCode::kAlreadyExists) return {};
-    if (status.code() == GcsCode::kNotFound) {
-      // Missing bucket, create it first ...
-      status = client_.CreateBucket(bucket, gcs::BucketMetadata()).status();
-      if (status.code() != GcsCode::kOk && status.code() != 
GcsCode::kAlreadyExists) {
-        return status;
+    // Find the list of missing parents. In the process we discover if any 
elements in
+    // the path are files, this is unavoidable as GCS does not really have 
directories.
+    std::vector<std::string> missing_parents;
+    auto dir = name;
+    for (; !dir.empty(); dir = get_parent(dir)) {
+      auto o = client_.GetObjectMetadata(bucket, dir);
+      if (o) {
+        if (IsDirectory(*o)) break;
+        return NotDirectoryError(*o);
       }
+      missing_parents.push_back(dir);
     }
-
-    for (auto parent = get_parent(object); !parent.empty(); parent = 
get_parent(parent)) {
-      status = CreateDirMarker(bucket, parent);
-      if (status.code() == GcsCode::kAlreadyExists) {
-        break;
+    if (dir.empty()) {
+      // We could not find any of the parent directories in the bucket, the 
last step is

Review comment:
       Seems like this is what you do for s3 too:
   
   
https://github.com/apache/arrow/blob/b325ef7f95f8348cc7b3230dd65a172bfd0ce650/cpp/src/arrow/filesystem/s3fs.cc#L2305-L2311
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to