kou commented on code in PR #39067:
URL: https://github.com/apache/arrow/pull/39067#discussion_r1512007062


##########
cpp/src/arrow/filesystem/filesystem.cc:
##########
@@ -15,12 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <mutex>
+#include <shared_mutex>
 #include <sstream>
+#include <unordered_map>
 #include <utility>
 
+#include "arrow/type_fwd.h"
 #include "arrow/util/config.h"
 
 #include "arrow/filesystem/filesystem.h"
+#include "arrow/util/string.h"

Review Comment:
   Can we move this to bellow `arrow/util/*.h` block?



##########
cpp/src/arrow/filesystem/filesystem.h:
##########
@@ -519,6 +568,85 @@ Result<std::shared_ptr<FileSystem>> 
FileSystemFromUriOrPath(
 
 /// @}
 
+/// \defgroup filesystem-factory-registration Helpers for FileSystem 
registration
+///
+/// @{
+
+/// \brief Register a Filesystem factory
+///
+/// Support for custom Uri schemes can be added by registering a factory
+/// for the corresponding FileSystem.
+///
+/// \param[in] scheme a Uri scheme which the factory will handle.
+///            If a factory has already been registered for a scheme,
+///            the new factory will be ignored.
+/// \param[in] factory a function which can produce a FileSystem for Uris 
which match
+///            scheme.
+/// \param[in] finalizer a function which must be called to finalize the 
factory before
+///            the process exits, or nullptr if no finalization is necessary.
+/// \return raises KeyError if a name collision occurs.
+ARROW_EXPORT Status RegisterFileSystemFactory(std::string scheme,
+                                              FileSystemFactory factory,
+                                              std::function<void()> finalizer 
= {});
+
+/// \brief Register Filesystem factories from a shared library

Review Comment:
   ```suggestion
   /// \brief Register FileSystem factories from a shared library
   ```



##########
cpp/src/arrow/filesystem/filesystem.cc:
##########
@@ -674,6 +685,141 @@ Status CopyFiles(const std::shared_ptr<FileSystem>& 
source_fs,
   return CopyFiles(sources, destinations, io_context, chunk_size, use_threads);
 }
 
+class FileSystemFactoryRegistry {
+ public:
+  static FileSystemFactoryRegistry* GetInstance() {
+    static FileSystemFactoryRegistry registry;
+    return &registry;
+  }
+
+  Result<FileSystemFactory*> FactoryForScheme(const std::string& scheme) {
+    std::shared_lock lock{mutex_};
+    if (finalized_) return AlreadyFinalized();
+
+    auto it = scheme_to_factory_.find(scheme);
+    if (it == scheme_to_factory_.end()) return nullptr;
+
+    return it->second.Map([](const auto& r) { return r.factory; });
+  }
+
+  Status MergeInto(FileSystemFactoryRegistry* main_registry) {
+    std::unique_lock lock{mutex_};
+    if (finalized_) return AlreadyFinalized();
+
+    auto& [main_mutex, main_scheme_to_factory, _] = *main_registry;
+    std::unique_lock main_lock{main_mutex};
+
+    std::vector<std::string_view> duplicated_schemes;
+    for (auto& [scheme, registered] : scheme_to_factory_) {
+      if (!registered.ok()) {
+        duplicated_schemes.emplace_back(scheme);
+        continue;
+      }
+
+      auto [it, success] = main_scheme_to_factory.emplace(std::move(scheme), 
registered);
+      if (success) continue;
+
+      duplicated_schemes.emplace_back(it->first);
+    }
+    scheme_to_factory_.clear();
+
+    if (duplicated_schemes.empty()) return Status::OK();
+    return Status::KeyError("Attempted to register ", 
duplicated_schemes.size(),
+                            " factories for schemes ['",
+                            arrow::internal::JoinStrings(duplicated_schemes, 
"', '"),
+                            "'] but those schemes were already registered.");
+  }
+
+  void EnsureFinalized() {
+    std::unique_lock lock{mutex_};
+    if (finalized_) return;
+
+    for (const auto& [_, registered_or_error] : scheme_to_factory_) {
+      if (!registered_or_error.ok()) continue;
+      registered_or_error->finalizer();
+    }
+    finalized_ = true;
+  }
+
+  Status RegisterFactory(std::string scheme, FileSystemFactory factory,
+                         std::function<void()> finalizer, bool defer_error) {
+    std::unique_lock lock{mutex_};
+    if (finalized_) return AlreadyFinalized();
+
+    auto [it, success] = scheme_to_factory_.emplace(
+        std::move(scheme), Registered{factory, std::move(finalizer)});
+    if (success) {
+      return Status::OK();
+    }
+
+    auto st = Status::KeyError(
+        "Attempted to register factory for "
+        "scheme '",
+        it->first,
+        "' but that scheme is already "
+        "registered.");
+    if (!defer_error) return st;
+
+    it->second = std::move(st);
+    return Status::OK();
+  }
+
+ private:
+  struct Registered {
+    FileSystemFactory* factory;
+    std::function<void()> finalizer;
+  };
+
+  static Status AlreadyFinalized() {
+    return Status::Invalid(
+        "FileSystem factories were already "
+        "finalized!");
+  }
+
+  std::shared_mutex mutex_;
+  std::unordered_map<std::string, Result<Registered>> scheme_to_factory_;
+  bool finalized_ = false;
+};
+
+Status RegisterFileSystemFactory(std::string scheme, FileSystemFactory factory,
+                                 std::function<void()> finalizer) {
+  return FileSystemFactoryRegistry::GetInstance()->RegisterFactory(
+      std::move(scheme), factory, std::move(finalizer),

Review Comment:
   Do we need `std::move()` here?
   
   ```suggestion
         std::move(scheme), std::move(factory), std::move(finalizer),
   ```



##########
cpp/src/arrow/filesystem/filesystem.h:
##########
@@ -519,6 +568,85 @@ Result<std::shared_ptr<FileSystem>> 
FileSystemFromUriOrPath(
 
 /// @}
 
+/// \defgroup filesystem-factory-registration Helpers for FileSystem 
registration
+///
+/// @{
+
+/// \brief Register a Filesystem factory

Review Comment:
   ```suggestion
   /// \brief Register a FileSystem factory
   ```



##########
cpp/src/arrow/filesystem/filesystem.h:
##########
@@ -519,6 +568,85 @@ Result<std::shared_ptr<FileSystem>> 
FileSystemFromUriOrPath(
 
 /// @}
 
+/// \defgroup filesystem-factory-registration Helpers for FileSystem 
registration
+///
+/// @{
+
+/// \brief Register a Filesystem factory
+///
+/// Support for custom Uri schemes can be added by registering a factory

Review Comment:
   URI?
   
   ```suggestion
   /// Support for custom URI schemes can be added by registering a factory
   ```



##########
cpp/src/arrow/filesystem/filesystem.h:
##########
@@ -519,6 +568,85 @@ Result<std::shared_ptr<FileSystem>> 
FileSystemFromUriOrPath(
 
 /// @}
 
+/// \defgroup filesystem-factory-registration Helpers for FileSystem 
registration
+///
+/// @{
+
+/// \brief Register a Filesystem factory
+///
+/// Support for custom Uri schemes can be added by registering a factory
+/// for the corresponding FileSystem.
+///
+/// \param[in] scheme a Uri scheme which the factory will handle.
+///            If a factory has already been registered for a scheme,
+///            the new factory will be ignored.
+/// \param[in] factory a function which can produce a FileSystem for Uris 
which match
+///            scheme.
+/// \param[in] finalizer a function which must be called to finalize the 
factory before
+///            the process exits, or nullptr if no finalization is necessary.
+/// \return raises KeyError if a name collision occurs.
+ARROW_EXPORT Status RegisterFileSystemFactory(std::string scheme,
+                                              FileSystemFactory factory,
+                                              std::function<void()> finalizer 
= {});
+
+/// \brief Register Filesystem factories from a shared library
+///
+/// FileSystem implementations may be housed in separate shared libraries and 
only
+/// registered when the shared library is explicitly loaded. 
FileSystemRegistrar is
+/// provided to simplify definition of such libraries: each instance at 
namespace scope
+/// in the library will register a factory for a scheme. Any library which uses
+/// FileSystemRegistrars and which must be dynamically loaded should be loaded 
using
+/// LoadFileSystemFactories(), which will additionally merge registries are if 
necessary
+/// (static linkage to arrow can produce isolated registries).
+ARROW_EXPORT Status LoadFileSystemFactories(const char* libpath);
+
+struct ARROW_EXPORT FileSystemRegistrar {
+  /// \brief Register a Filesystem factory at load time

Review Comment:
   ```suggestion
     /// \brief Register a FileSystem factory at load time
   ```



##########
cpp/src/arrow/filesystem/filesystem.cc:
##########
@@ -674,6 +685,141 @@ Status CopyFiles(const std::shared_ptr<FileSystem>& 
source_fs,
   return CopyFiles(sources, destinations, io_context, chunk_size, use_threads);
 }
 
+class FileSystemFactoryRegistry {
+ public:
+  static FileSystemFactoryRegistry* GetInstance() {
+    static FileSystemFactoryRegistry registry;
+    return &registry;
+  }
+
+  Result<FileSystemFactory*> FactoryForScheme(const std::string& scheme) {
+    std::shared_lock lock{mutex_};
+    if (finalized_) return AlreadyFinalized();
+
+    auto it = scheme_to_factory_.find(scheme);
+    if (it == scheme_to_factory_.end()) return nullptr;
+
+    return it->second.Map([](const auto& r) { return r.factory; });
+  }
+
+  Status MergeInto(FileSystemFactoryRegistry* main_registry) {
+    std::unique_lock lock{mutex_};
+    if (finalized_) return AlreadyFinalized();
+
+    auto& [main_mutex, main_scheme_to_factory, _] = *main_registry;
+    std::unique_lock main_lock{main_mutex};
+
+    std::vector<std::string_view> duplicated_schemes;
+    for (auto& [scheme, registered] : scheme_to_factory_) {
+      if (!registered.ok()) {
+        duplicated_schemes.emplace_back(scheme);
+        continue;
+      }
+
+      auto [it, success] = main_scheme_to_factory.emplace(std::move(scheme), 
registered);
+      if (success) continue;
+
+      duplicated_schemes.emplace_back(it->first);
+    }
+    scheme_to_factory_.clear();
+
+    if (duplicated_schemes.empty()) return Status::OK();
+    return Status::KeyError("Attempted to register ", 
duplicated_schemes.size(),
+                            " factories for schemes ['",
+                            arrow::internal::JoinStrings(duplicated_schemes, 
"', '"),
+                            "'] but those schemes were already registered.");
+  }
+
+  void EnsureFinalized() {
+    std::unique_lock lock{mutex_};
+    if (finalized_) return;
+
+    for (const auto& [_, registered_or_error] : scheme_to_factory_) {
+      if (!registered_or_error.ok()) continue;
+      registered_or_error->finalizer();
+    }
+    finalized_ = true;
+  }
+
+  Status RegisterFactory(std::string scheme, FileSystemFactory factory,
+                         std::function<void()> finalizer, bool defer_error) {
+    std::unique_lock lock{mutex_};
+    if (finalized_) return AlreadyFinalized();
+
+    auto [it, success] = scheme_to_factory_.emplace(
+        std::move(scheme), Registered{factory, std::move(finalizer)});
+    if (success) {
+      return Status::OK();
+    }
+
+    auto st = Status::KeyError(
+        "Attempted to register factory for "
+        "scheme '",
+        it->first,
+        "' but that scheme is already "
+        "registered.");
+    if (!defer_error) return st;
+
+    it->second = std::move(st);
+    return Status::OK();
+  }
+
+ private:
+  struct Registered {
+    FileSystemFactory* factory;
+    std::function<void()> finalizer;
+  };
+
+  static Status AlreadyFinalized() {
+    return Status::Invalid(
+        "FileSystem factories were already "
+        "finalized!");
+  }
+
+  std::shared_mutex mutex_;
+  std::unordered_map<std::string, Result<Registered>> scheme_to_factory_;
+  bool finalized_ = false;
+};
+
+Status RegisterFileSystemFactory(std::string scheme, FileSystemFactory factory,
+                                 std::function<void()> finalizer) {
+  return FileSystemFactoryRegistry::GetInstance()->RegisterFactory(
+      std::move(scheme), factory, std::move(finalizer),
+      /*defer_error=*/false);
+}
+
+void EnsureFinalized() { 
FileSystemFactoryRegistry::GetInstance()->EnsureFinalized(); }
+
+FileSystemRegistrar::FileSystemRegistrar(std::string scheme, FileSystemFactory 
factory,
+                                         std::function<void()> finalizer) {
+  DCHECK_OK(FileSystemFactoryRegistry::GetInstance()->RegisterFactory(
+      std::move(scheme), std::move(factory), std::move(finalizer),
+      /*defer_error=*/true));
+}
+
+extern "C" {
+ARROW_EXPORT void* arrow_FileSystem_GetRegistry() {

Review Comment:
   Can we avoid mixing snake_case and camelCase?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to