This is an automated email from the ASF dual-hosted git repository.
raulcd pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 591c14b007 GH-46369: [C++] Add key-value pairs to the
FileSystemFactory interface and to S3Options (#50044)
591c14b007 is described below
commit 591c14b007f48905cf5146add136573b74ca47a8
Author: Raúl Cumplido <[email protected]>
AuthorDate: Thu Jun 18 09:41:24 2026 +0200
GH-46369: [C++] Add key-value pairs to the FileSystemFactory interface and
to S3Options (#50044)
### Rationale for this change
From the issue description:
> From discussion on PR
https://github.com/apache/arrow/pull/41559#discussion_r1768836077 and the
[mailing
list](https://lists.apache.org/thread/ot4p23mc292p5103nfgrbyqpnhhp6clx), it's
desirable to extend the filesystem factory interface to accept a set of
key-value pairs in addition to a URL. This will draw a clearer distinction
between filesystem options which are expected to be independent of the user
(the URI, which should be reusable by multiple users) and options which are
expec [...]
### What changes are included in this PR?
- A new options `FileSystemFactoryOptions`, added to
`FileSystemFromUriAndOptions` which allows to pass via `FileSystemFromUriReal`
through `FileSystemFactory::function`
- The `s3` and `local`/`file` factories are migrated to the new 4-arg
signature.
- Schemes that do not yet consume options reject non-empty options with
`NotImplemented` rather than silently dropping them.
- `examplefs` now reads two typed options and appends them to the output
path, exercised by `arrow-filesystem-test`, proving `std::any` survives the
`libarrow` to `arrow_filesystem_example.so` boundary.
- CI: `ci/scripts/cpp_build.sh` now forwards `ARROW_S3_MODULE` to CMake.
The conda-cpp image was already setting `ARROW_S3_MODULE=ON`, but the flag was
never passed through, so `s3fs_module_test` (added in #41559) had not actually
been built or run in CI. This activates it.
- Added `access_key`, `secret_key`, `session_token`, `retry_strategy` and
`default_metadata` as `FileSystemFactoryOptions` on
`S3Options::FromUriAndOptions`.
- Tests exercising
### Are these changes tested?
Yes, new tests added to validate a dynamically-loaded filesystem is able to
receive options via `std::any` and tests validating rejection if options passes
to Filesystems that don't support it. Tests also added for
`S3Options::FromUriAndOptions` receiving the new `FileSystemFactoryOptions` and
usage on separate s3fs module
### Are there any user-facing changes?
New public `FileSystemFromUriOptions` taking an options list. Existing ones
are unchanged.
* GitHub Issue: #46369
Lead-authored-by: Raúl Cumplido <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Raúl Cumplido <[email protected]>
---
ci/scripts/cpp_build.sh | 2 +
cpp/src/arrow/filesystem/CMakeLists.txt | 2 +-
cpp/src/arrow/filesystem/filesystem.cc | 32 ++++--
cpp/src/arrow/filesystem/filesystem.h | 88 +++++++++++++++-
cpp/src/arrow/filesystem/filesystem_test.cc | 6 ++
cpp/src/arrow/filesystem/localfs_test.cc | 68 ++++++++++++-
cpp/src/arrow/filesystem/s3fs.cc | 112 ++++++++++++++++-----
cpp/src/arrow/filesystem/s3fs.h | 18 ++++
cpp/src/arrow/filesystem/s3fs_module_test.cc | 51 ++++++++++
cpp/src/arrow/filesystem/s3fs_test.cc | 90 +++++++++++++++++
cpp/src/arrow/filesystem/util_internal.h | 16 +++
cpp/src/arrow/testing/examplefs.cc | 36 ++++++-
.../arrow/testing/{examplefs.cc => examplefs.h} | 22 ++--
13 files changed, 484 insertions(+), 59 deletions(-)
diff --git a/ci/scripts/cpp_build.sh b/ci/scripts/cpp_build.sh
index 1e2f3e8f8f..3d9b2ba72d 100755
--- a/ci/scripts/cpp_build.sh
+++ b/ci/scripts/cpp_build.sh
@@ -70,6 +70,7 @@ if [ "${ARROW_ENABLE_THREADING:-ON}" = "OFF" ]; then
ARROW_JEMALLOC=OFF
ARROW_MIMALLOC=OFF
ARROW_S3=OFF
+ ARROW_S3_MODULE=OFF
ARROW_WITH_OPENTELEMETRY=OFF
fi
@@ -229,6 +230,7 @@ else
-DARROW_PARQUET=${ARROW_PARQUET:-OFF} \
-DARROW_RUNTIME_SIMD_LEVEL=${ARROW_RUNTIME_SIMD_LEVEL:-MAX} \
-DARROW_S3=${ARROW_S3:-OFF} \
+ -DARROW_S3_MODULE=${ARROW_S3_MODULE:-OFF} \
-DARROW_SIMD_LEVEL=${ARROW_SIMD_LEVEL:-DEFAULT} \
-DARROW_SUBSTRAIT=${ARROW_SUBSTRAIT:-OFF} \
-DARROW_TEST_LINKAGE=${ARROW_TEST_LINKAGE:-shared} \
diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt
b/cpp/src/arrow/filesystem/CMakeLists.txt
index e6330df426..ee46f4d256 100644
--- a/cpp/src/arrow/filesystem/CMakeLists.txt
+++ b/cpp/src/arrow/filesystem/CMakeLists.txt
@@ -128,7 +128,7 @@ if(ARROW_S3)
endif()
endif()
- if(ARROW_S3_MODULE)
+ if(ARROW_S3_MODULE AND ARROW_BUILD_TESTS)
add_arrow_test(s3fs_module_test
SOURCES
s3fs_module_test.cc
diff --git a/cpp/src/arrow/filesystem/filesystem.cc
b/cpp/src/arrow/filesystem/filesystem.cc
index 8281bed7ce..f92336e004 100644
--- a/cpp/src/arrow/filesystem/filesystem.cc
+++ b/cpp/src/arrow/filesystem/filesystem.cc
@@ -893,10 +893,10 @@ Status LoadFileSystemFactories(const char* libpath) {
namespace {
-Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
- const std::string&
uri_string,
- const io::IOContext&
io_context,
- std::string*
out_path) {
+Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(
+ const Uri& uri, const std::string& uri_string,
+ const FileSystemFactoryOptions& options, const io::IOContext& io_context,
+ std::string* out_path) {
const auto scheme = uri.scheme();
{
@@ -904,9 +904,13 @@ Result<std::shared_ptr<FileSystem>>
FileSystemFromUriReal(const Uri& uri,
auto* factory,
FileSystemFactoryRegistry::GetInstance()->FactoryForScheme(scheme));
if (factory != nullptr) {
- return factory->function(uri, io_context, out_path);
+ return factory->function(uri, options, io_context, out_path);
}
}
+ if (!options.empty()) {
+ return Status::NotImplemented("Filesystem options are not supported yet
for scheme '",
+ scheme, "', got ", options.size(), "
option(s)");
+ }
if (scheme == "abfs" || scheme == "abfss") {
#ifdef ARROW_AZURE
@@ -962,14 +966,28 @@ Result<std::shared_ptr<FileSystem>>
FileSystemFromUriReal(const Uri& uri,
Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string&
uri_string,
std::string* out_path) {
- return FileSystemFromUri(uri_string, io::default_io_context(), out_path);
+ return FileSystemFromUriAndOptions(uri_string, /*options=*/{},
io::default_io_context(),
+ out_path);
+}
+
+Result<std::shared_ptr<FileSystem>> FileSystemFromUriAndOptions(
+ const std::string& uri_string, const FileSystemFactoryOptions& options,
+ std::string* out_path) {
+ return FileSystemFromUriAndOptions(uri_string, options,
io::default_io_context(),
+ out_path);
}
Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string&
uri_string,
const io::IOContext&
io_context,
std::string* out_path) {
+ return FileSystemFromUriAndOptions(uri_string, /*options=*/{}, io_context,
out_path);
+}
+
+Result<std::shared_ptr<FileSystem>> FileSystemFromUriAndOptions(
+ const std::string& uri_string, const FileSystemFactoryOptions& options,
+ const io::IOContext& io_context, std::string* out_path) {
ARROW_ASSIGN_OR_RAISE(auto fsuri, ParseFileSystemUri(uri_string));
- return FileSystemFromUriReal(fsuri, uri_string, io_context, out_path);
+ return FileSystemFromUriReal(fsuri, uri_string, options, io_context,
out_path);
}
Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(const std::string&
uri_string,
diff --git a/cpp/src/arrow/filesystem/filesystem.h
b/cpp/src/arrow/filesystem/filesystem.h
index 3a47eb62f5..a2862d9c1f 100644
--- a/cpp/src/arrow/filesystem/filesystem.h
+++ b/cpp/src/arrow/filesystem/filesystem.h
@@ -17,6 +17,7 @@
#pragma once
+#include <any>
#include <chrono>
#include <cstdint>
#include <functional>
@@ -28,6 +29,7 @@
#include "arrow/filesystem/type_fwd.h"
#include "arrow/io/interfaces.h"
+#include "arrow/result.h"
#include "arrow/type_fwd.h"
#include "arrow/util/compare.h"
#include "arrow/util/macros.h"
@@ -357,13 +359,44 @@ class ARROW_EXPORT FileSystem
bool default_async_is_sync_ = true;
};
+using FileSystemFactoryOptions = std::vector<std::pair<std::string, std::any>>;
+
struct FileSystemFactory {
std::function<Result<std::shared_ptr<FileSystem>>(
- const Uri& uri, const io::IOContext& io_context, std::string* out_path)>
+ const Uri& uri, const FileSystemFactoryOptions& options,
+ const io::IOContext& io_context, std::string* out_path)>
function;
std::string_view file;
int line;
+ /// Construct from an options-aware factory function.
+ FileSystemFactory(std::function<Result<std::shared_ptr<FileSystem>>(
+ const Uri&, const FileSystemFactoryOptions&, const
io::IOContext&,
+ std::string*)>
+ fn,
+ std::string_view file, int line)
+ : function(std::move(fn)), file(file), line(line) {}
+
+ /// Construct from a non-options aware factory function maintaining source
compatibility
+ /// with existing factories.
+ FileSystemFactory(std::function<Result<std::shared_ptr<FileSystem>>(
+ const Uri&, const io::IOContext&, std::string*)>
+ fn,
+ std::string_view file, int line)
+ : function([fn = std::move(fn)](
+ const Uri& uri, const FileSystemFactoryOptions& options,
+ const io::IOContext& ctx,
+ std::string* out_path) ->
Result<std::shared_ptr<FileSystem>> {
+ if (!options.empty()) {
+ return Status::NotImplemented(
+ "Filesystem factory does not support additional options, got ",
+ options.size(), " option(s)");
+ }
+ return fn(uri, ctx, out_path);
+ }),
+ file(file),
+ line(line) {}
+
bool operator==(const FileSystemFactory& other) const {
// In the case where libarrow is linked statically both to the executable
and to a
// dynamically loaded filesystem implementation library, the library
contains a
@@ -536,7 +569,7 @@ void EnsureFinalized();
/// \brief Create a new FileSystem by URI
///
/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
-/// "gs" and "gcs".
+/// "gs", "gcs", "abfs" and "abfss".
///
/// Support for other schemes can be added using RegisterFileSystemFactory.
///
@@ -547,10 +580,34 @@ ARROW_EXPORT
Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const std::string& uri,
std::string* out_path =
NULLPTR);
+/// \brief Create a new FileSystem by URI with extended backend-specific
filesystem
+/// options
+///
+/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
+/// "gs", "gcs", "abfs" and "abfss".
+///
+/// Support for other schemes can be added using RegisterFileSystemFactory.
+///
+/// \param[in] uri the URI to give access to
+/// \param[in] options a list of backend-specific filesystem options
+/// Each option is a (name, value) pair.
+/// The expected type is specific to the backend and
+/// option name.
+/// Options are forwarded to schemes dispatched through a registered
+/// FileSystemFactory. Non-empty options return NotImplemented for
a registered
+/// FileSystemFactory that does not support them or for schemes not
handled by
+/// a registered factory.
+/// \param[out] out_path (optional) Path inside the filesystem.
+/// \return out_fs FileSystem instance.
+ARROW_EXPORT
+Result<std::shared_ptr<FileSystem>> FileSystemFromUriAndOptions(
+ const std::string& uri, const FileSystemFactoryOptions& options,
+ std::string* out_path = NULLPTR);
+
/// \brief Create a new FileSystem by URI with a custom IO context
///
/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
-/// "gs" and "gcs".
+/// "gs", "gcs", "abfs" and "abfss".
///
/// Support for other schemes can be added using RegisterFileSystemFactory.
///
@@ -563,6 +620,31 @@ Result<std::shared_ptr<FileSystem>>
FileSystemFromUri(const std::string& uri,
const io::IOContext&
io_context,
std::string* out_path =
NULLPTR);
+/// \brief Create a new FileSystem by URI with a custom IO context with
backend-specific
+/// filesystem options
+///
+/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
+/// "gs", "gcs", "abfs" and "abfss".
+///
+/// Support for other schemes can be added using RegisterFileSystemFactory.
+///
+/// \param[in] uri a URI-based path, ex: file:///some/local/path
+/// \param[in] options a list of backend-specific filesystem options
+/// Each option is a (name, value) pair.
+/// The expected type is specific to the backend and
+/// option name.
+/// Options are forwarded to schemes dispatched through a registered
+/// FileSystemFactory. Non-empty options return NotImplemented for
a registered
+/// FileSystemFactory that does not support them or for schemes not
handled by
+/// a registered factory.
+/// \param[in] io_context an IOContext which will be associated with the
filesystem
+/// \param[out] out_path (optional) Path inside the filesystem.
+/// \return out_fs FileSystem instance.
+ARROW_EXPORT
+Result<std::shared_ptr<FileSystem>> FileSystemFromUriAndOptions(
+ const std::string& uri, const FileSystemFactoryOptions& options,
+ const io::IOContext& io_context, std::string* out_path = NULLPTR);
+
/// \brief Create a new FileSystem by URI
///
/// Support for other schemes can be added using RegisterFileSystemFactory.
diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc
b/cpp/src/arrow/filesystem/filesystem_test.cc
index 5072c3a8c2..10a3922e36 100644
--- a/cpp/src/arrow/filesystem/filesystem_test.cc
+++ b/cpp/src/arrow/filesystem/filesystem_test.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <any>
#include <memory>
#include <string>
#include <utility>
@@ -640,6 +641,11 @@ TEST_F(TestMockFS, FileSystemFromUri) {
Invalid, ::testing::HasSubstr("syntax error at character ' ' (position
12)"),
FileSystemFromUri("mock:/folder name/bar", &path));
CheckDirs({});
+ FileSystemFactoryOptions options{{"some_option", 1}};
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ NotImplemented, ::testing::HasSubstr("options are not supported"),
+ FileSystemFromUriAndOptions("mock:///foo/bar", options, &path));
+ CheckDirs({});
}
////////////////////////////////////////////////////////////////////////////
diff --git a/cpp/src/arrow/filesystem/localfs_test.cc
b/cpp/src/arrow/filesystem/localfs_test.cc
index 2e91783c92..212d919896 100644
--- a/cpp/src/arrow/filesystem/localfs_test.cc
+++ b/cpp/src/arrow/filesystem/localfs_test.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <any>
#include <memory>
#include <sstream>
#include <string>
@@ -29,6 +30,7 @@
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/test_util.h"
#include "arrow/filesystem/util_internal.h"
+#include "arrow/testing/examplefs.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/util/io_util.h"
@@ -83,6 +85,12 @@ Result<std::shared_ptr<FileSystem>> FSFromUriOrPath(const
std::string& uri,
////////////////////////////////////////////////////////////////////////////
// Registered FileSystemFactory tests
+struct ConcreteTypedOption : ExampleTypedOption {
+ explicit ConcreteTypedOption(int value) : value_(value) {}
+ int value() const override { return value_; }
+ int value_;
+};
+
class SlowFileSystemPublicProps : public SlowFileSystem {
public:
SlowFileSystemPublicProps(std::shared_ptr<FileSystem> base_fs, double
average_latency,
@@ -144,7 +152,6 @@ TEST(FileSystemFromUri, LoadedRegisteredFactory) {
EXPECT_THAT(FileSystemFromUri("example:///hey/yo", &path),
Raises(StatusCode::Invalid));
EXPECT_THAT(LoadFileSystemFactories(ARROW_FILESYSTEM_EXAMPLE_LIBPATH), Ok());
-
ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("example:///hey/yo", &path));
EXPECT_EQ(path, "/hey/yo");
EXPECT_EQ(fs->type_name(), "local");
@@ -171,10 +178,21 @@ TEST(FileSystemFromUri, RuntimeRegisteredFactory) {
}
FileSystemRegistrar kSegfaultFileSystemModule[]{
- ARROW_REGISTER_FILESYSTEM("segfault", nullptr, {}),
- ARROW_REGISTER_FILESYSTEM("segfault", nullptr, {}),
- ARROW_REGISTER_FILESYSTEM("segfault", nullptr, {}),
-};
+ ARROW_REGISTER_FILESYSTEM(
+ "segfault",
+ std::function<Result<std::shared_ptr<FileSystem>>(
+ const Uri&, const io::IOContext&, std::string*)>(nullptr),
+ {}),
+ ARROW_REGISTER_FILESYSTEM(
+ "segfault",
+ std::function<Result<std::shared_ptr<FileSystem>>(
+ const Uri&, const io::IOContext&, std::string*)>(nullptr),
+ {}),
+ ARROW_REGISTER_FILESYSTEM(
+ "segfault",
+ std::function<Result<std::shared_ptr<FileSystem>>(
+ const Uri&, const io::IOContext&, std::string*)>(nullptr),
+ {})};
TEST(FileSystemFromUri, LinkedRegisteredFactoryNameCollision) {
// Since multiple registrars are defined in this translation unit which all
// register factories for the 'segfault' scheme, using that scheme in
FileSystemFromUri
@@ -185,6 +203,46 @@ TEST(FileSystemFromUri,
LinkedRegisteredFactoryNameCollision) {
// other schemes are not affected by the collision
EXPECT_THAT(FileSystemFromUri("slowfile:///hey/yo", &path), Ok());
}
+
+TEST(FileSystemFromUriAndOptions, LoadedRegisteredFactory) {
+#ifdef __EMSCRIPTEN__
+ GTEST_SKIP() << "Emscripten dynamic library testing disabled";
+#endif
+ std::string path;
+ EXPECT_THAT(LoadFileSystemFactories(ARROW_FILESYSTEM_EXAMPLE_LIBPATH), Ok());
+ // Validate extra options are forwarded to the factory.
+ FileSystemFactoryOptions options{
+ {"example_option_string", std::string("example_value")},
+ {"example_option_int", 42},
+ {"example_typed_option",
+
std::shared_ptr<ExampleTypedOption>(std::make_shared<ConcreteTypedOption>(12345))},
+ };
+ ASSERT_OK_AND_ASSIGN(auto fs,
+ FileSystemFromUriAndOptions("example:///hey/yo",
options, &path));
+ EXPECT_EQ(path, "/hey/yo/example_value/42/12345");
+ EXPECT_EQ(fs->type_name(), "local");
+}
+
+TEST(FileSystemFromUriAndOptions, RuntimeRegisteredFactory) {
+ std::string path;
+ EXPECT_THAT(FileSystemFromUriAndOptions("slowfile3:///hey/yo", {}, &path),
+ Raises(StatusCode::Invalid));
+
+ EXPECT_THAT(
+ RegisterFileSystemFactory("slowfile3", {SlowFileSystemFactory, __FILE__,
__LINE__}),
+ Ok());
+
+ ASSERT_OK_AND_ASSIGN(auto fs,
+ FileSystemFromUriAndOptions("slowfile3:///hey/yo", {},
&path));
+ EXPECT_EQ(path, "/hey/yo");
+ EXPECT_EQ(fs->type_name(), "slow");
+
+ // Validate that legacy (3-arg) factories reject non-empty options.
+ FileSystemFactoryOptions unsupported{{"some_option", 1}};
+ EXPECT_THAT(FileSystemFromUriAndOptions("slowfile3:///hey/yo", unsupported,
&path),
+ Raises(StatusCode::NotImplemented));
+}
+
////////////////////////////////////////////////////////////////////////////
// Misc tests
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 970ee9d71e..cc5fb9cd66 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -327,7 +327,50 @@ S3Options S3Options::FromAssumeRoleWithWebIdentity() {
}
Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
- S3Options options;
+ return FromUriAndOptions(uri, FileSystemFactoryOptions{}, out_path);
+}
+
+Result<S3Options> S3Options::FromUri(const std::string& uri_string,
+ std::string* out_path) {
+ Uri uri;
+ RETURN_NOT_OK(uri.Parse(uri_string));
+ return FromUri(uri, out_path);
+}
+
+Result<S3Options> S3Options::FromUriAndOptions(const ::arrow::util::Uri& uri,
+ const FileSystemFactoryOptions&
options,
+ std::string* out_path) {
+ std::optional<std::string> access_key, secret_key, session_token;
+ std::shared_ptr<S3RetryStrategy> retry_strategy;
+ std::shared_ptr<const KeyValueMetadata> default_metadata;
+ for (const auto& [key, value] : options) {
+ if (key == "access_key") {
+ ARROW_ASSIGN_OR_RAISE(access_key, internal::GetOption<std::string>(key,
value));
+ } else if (key == "secret_key") {
+ ARROW_ASSIGN_OR_RAISE(secret_key, internal::GetOption<std::string>(key,
value));
+ } else if (key == "session_token") {
+ ARROW_ASSIGN_OR_RAISE(session_token,
internal::GetOption<std::string>(key, value));
+ } else if (key == "retry_strategy") {
+ ARROW_ASSIGN_OR_RAISE(
+ retry_strategy,
+ internal::GetOption<std::shared_ptr<S3RetryStrategy>>(key, value));
+ } else if (key == "default_metadata") {
+ ARROW_ASSIGN_OR_RAISE(
+ default_metadata,
+ internal::GetConstSharedPtrOption<KeyValueMetadata>(key, value));
+ } else {
+ return Status::Invalid("Unexpected option for S3 filesystem: '", key,
"'");
+ }
+ }
+
+ if (access_key.has_value() != secret_key.has_value()) {
+ return Status::Invalid(
+ "Both 'access_key' and 'secret_key' must be provided together");
+ }
+ if (session_token.has_value() && !access_key.has_value()) {
+ return Status::Invalid("'session_token' requires 'access_key' and
'secret_key'");
+ }
+ S3Options s3_options;
const auto bucket = uri.host();
auto path = uri.path();
@@ -356,67 +399,84 @@ Result<S3Options> S3Options::FromUri(const Uri& uri,
std::string* out_path) {
}
const auto username = uri.username();
- if (!username.empty()) {
- options.ConfigureAccessKey(username, uri.password());
+ const auto password = uri.password();
+ if (access_key.has_value() && (!username.empty() || !password.empty())) {
+ return Status::Invalid(
+ "Credentials provided both in the URI and through filesystem options");
+ }
+
+ if (access_key.has_value()) {
+ s3_options.ConfigureAccessKey(*access_key, *secret_key,
session_token.value_or(""));
+ } else if (!username.empty()) {
+ s3_options.ConfigureAccessKey(username, password);
} else {
- options.ConfigureDefaultCredentials();
+ s3_options.ConfigureDefaultCredentials();
}
+
// Prefer AWS service-specific endpoint url
auto s3_endpoint_env = arrow::internal::GetEnvVar(kAwsEndpointUrlS3EnvVar);
if (s3_endpoint_env.ok()) {
- options.endpoint_override = *s3_endpoint_env;
+ s3_options.endpoint_override = *s3_endpoint_env;
} else {
auto endpoint_env = arrow::internal::GetEnvVar(kAwsEndpointUrlEnvVar);
if (endpoint_env.ok()) {
- options.endpoint_override = *endpoint_env;
+ s3_options.endpoint_override = *endpoint_env;
}
}
bool region_set = false;
for (const auto& kv : options_map) {
if (kv.first == "region") {
- options.region = kv.second;
+ s3_options.region = kv.second;
region_set = true;
} else if (kv.first == "scheme") {
- options.scheme = kv.second;
+ s3_options.scheme = kv.second;
} else if (kv.first == "endpoint_override") {
- options.endpoint_override = kv.second;
+ s3_options.endpoint_override = kv.second;
} else if (kv.first == "allow_delayed_open") {
- ARROW_ASSIGN_OR_RAISE(options.allow_delayed_open,
+ ARROW_ASSIGN_OR_RAISE(s3_options.allow_delayed_open,
::arrow::internal::ParseBoolean(kv.second));
} else if (kv.first == "allow_bucket_creation") {
- ARROW_ASSIGN_OR_RAISE(options.allow_bucket_creation,
+ ARROW_ASSIGN_OR_RAISE(s3_options.allow_bucket_creation,
::arrow::internal::ParseBoolean(kv.second));
} else if (kv.first == "allow_bucket_deletion") {
- ARROW_ASSIGN_OR_RAISE(options.allow_bucket_deletion,
+ ARROW_ASSIGN_OR_RAISE(s3_options.allow_bucket_deletion,
::arrow::internal::ParseBoolean(kv.second));
} else if (kv.first == "tls_ca_file_path") {
- options.tls_ca_file_path = kv.second;
+ s3_options.tls_ca_file_path = kv.second;
} else if (kv.first == "tls_ca_dir_path") {
- options.tls_ca_dir_path = kv.second;
+ s3_options.tls_ca_dir_path = kv.second;
} else if (kv.first == "tls_verify_certificates") {
- ARROW_ASSIGN_OR_RAISE(options.tls_verify_certificates,
+ ARROW_ASSIGN_OR_RAISE(s3_options.tls_verify_certificates,
::arrow::internal::ParseBoolean(kv.second));
} else if (kv.first == "smart_defaults") {
- options.smart_defaults = kv.second;
+ s3_options.smart_defaults = kv.second;
} else {
return Status::Invalid("Unexpected query parameter in S3 URI: '",
kv.first, "'");
}
}
- if (!region_set && !bucket.empty() && options.endpoint_override.empty()) {
+ if (retry_strategy) {
+ s3_options.retry_strategy = std::move(retry_strategy);
+ }
+ if (default_metadata) {
+ s3_options.default_metadata = std::move(default_metadata);
+ }
+
+ if (!region_set && !bucket.empty() && s3_options.endpoint_override.empty()) {
// XXX Should we use a dedicated resolver with the given credentials?
- ARROW_ASSIGN_OR_RAISE(options.region, ResolveS3BucketRegion(bucket));
+ ARROW_ASSIGN_OR_RAISE(s3_options.region, ResolveS3BucketRegion(bucket));
}
- return options;
+ return s3_options;
}
-Result<S3Options> S3Options::FromUri(const std::string& uri_string,
- std::string* out_path) {
+Result<S3Options> S3Options::FromUriAndOptions(const std::string& uri_string,
+ const FileSystemFactoryOptions&
options,
+ std::string* out_path) {
Uri uri;
RETURN_NOT_OK(uri.Parse(uri_string));
- return FromUri(uri, out_path);
+ return FromUriAndOptions(uri, options, out_path);
}
bool S3Options::Equals(const S3Options& other) const {
@@ -3611,11 +3671,13 @@ Result<std::string> ResolveS3BucketRegion(const
std::string& bucket) {
auto kS3FileSystemModule = ARROW_REGISTER_FILESYSTEM(
"s3",
- [](const arrow::util::Uri& uri, const io::IOContext& io_context,
+ [](const arrow::util::Uri& uri, const FileSystemFactoryOptions& options,
+ const io::IOContext& io_context,
std::string* out_path) -> Result<std::shared_ptr<fs::FileSystem>> {
RETURN_NOT_OK(EnsureS3Initialized());
- ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path));
- return S3FileSystem::Make(options, io_context);
+ ARROW_ASSIGN_OR_RAISE(auto s3_options,
+ S3Options::FromUriAndOptions(uri, options,
out_path));
+ return S3FileSystem::Make(s3_options, io_context);
},
[] { DCHECK_OK(EnsureS3Finalized()); });
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index 158d70a93f..47d249898b 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -17,6 +17,7 @@
#pragma once
+#include <any>
#include <memory>
#include <string>
#include <vector>
@@ -290,6 +291,23 @@ struct ARROW_EXPORT S3Options {
std::string* out_path = NULLPTR);
static Result<S3Options> FromUri(const std::string& uri,
std::string* out_path = NULLPTR);
+
+ /// Equivalent to FromUri() with specific backend options that can't be
represented
+ /// on the URI or are better kept out of it (such as credentials).
+ /// Each option is a (name, value) pair. Recognized keys:
+ /// - "access_key" (std::string)
+ /// - "secret_key" (std::string)
+ /// - "session_token" (std::string)
+ /// - "retry_strategy" (std::shared_ptr<S3RetryStrategy>)
+ /// - "default_metadata" (std::shared_ptr<const KeyValueMetadata>)
+ /// Options appearing both in the URI and in the options list return
Status::Invalid;
+ /// unknown keys or invalid values return Status::Invalid.
+ static Result<S3Options> FromUriAndOptions(const ::arrow::util::Uri& uri,
+ const FileSystemFactoryOptions&
options,
+ std::string* out_path = NULLPTR);
+ static Result<S3Options> FromUriAndOptions(const std::string& uri,
+ const FileSystemFactoryOptions&
options,
+ std::string* out_path = NULLPTR);
};
/// S3-backed FileSystem implementation.
diff --git a/cpp/src/arrow/filesystem/s3fs_module_test.cc
b/cpp/src/arrow/filesystem/s3fs_module_test.cc
index 987a8979b2..f056b7fe55 100644
--- a/cpp/src/arrow/filesystem/s3fs_module_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_module_test.cc
@@ -16,6 +16,7 @@
// under the License.
#include <algorithm>
+#include <any>
#include <exception>
#include <memory>
#include <sstream>
@@ -82,4 +83,54 @@ TEST(S3Test, FromUri) {
"&allow_bucket_creation=0&allow_bucket_deletion=0");
}
+TEST(S3Test, FromUriAndOptionsCredentials) {
+ ASSERT_OK_AND_ASSIGN(auto minio, GetMinioEnv()->GetOneServer());
+ std::string path;
+ FileSystemFactoryOptions options{
+ {"access_key", std::string(minio->access_key())},
+ {"secret_key", std::string(minio->secret_key())},
+ };
+ // Credentials supplied via options, NOT in the URI.
+ ASSERT_OK_AND_ASSIGN(
+ auto fs,
+ FileSystemFromUriAndOptions("s3://bucket/somedir/subdir/subfile",
options, &path));
+ // They crossed the module boundary and were applied -> reflected in MakeUri.
+ EXPECT_EQ(fs->MakeUri("/" + path),
+ "s3://minio:miniopass@bucket/somedir/subdir/subfile"
+ "?region=us-east-1&scheme=https&endpoint_override="
+ "&allow_bucket_creation=0&allow_bucket_deletion=0");
+}
+
+namespace {
+class NoopRetryStrategy : public S3RetryStrategy {
+ public:
+ bool ShouldRetry(const AWSErrorDetail&, int64_t) override { return false; }
+ int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail&, int64_t)
override {
+ return 0;
+ }
+};
+} // namespace
+
+TEST(S3Test, FromUriAndOptionsRetryStrategy) {
+ ASSERT_OK_AND_ASSIGN(auto minio, GetMinioEnv()->GetOneServer());
+ FileSystemFactoryOptions options{
+ {"access_key", std::string(minio->access_key())},
+ {"secret_key", std::string(minio->secret_key())},
+ {"retry_strategy",
+
std::shared_ptr<S3RetryStrategy>(std::make_shared<NoopRetryStrategy>())},
+ };
+ std::string path;
+ ASSERT_OK_AND_ASSIGN(
+ auto fs,
+ FileSystemFromUriAndOptions("s3://bucket/somedir/subdir/subfile",
options, &path));
+ ASSERT_NE(fs, nullptr);
+}
+
+TEST(S3Test, FromUriRejectsUnknownOptions) {
+ FileSystemFactoryOptions options{{"some_option", 1}};
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ Invalid, ::testing::HasSubstr("Unexpected option"),
+ FileSystemFromUriAndOptions("s3://bucket/key", options));
+}
+
} // namespace arrow::fs
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc
b/cpp/src/arrow/filesystem/s3fs_test.cc
index 52c2621249..d513e9ca97 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -398,6 +398,96 @@ TEST_F(S3OptionsTest, FromAccessKey) {
ASSERT_EQ(options.GetSessionToken(), "token");
}
+TEST_F(S3OptionsTest, FromUriAndOptionsCredentials) {
+ std::string path;
+ S3Options options;
+ FileSystemFactoryOptions kv{
+ {"access_key", std::string("ak")},
+ {"secret_key", std::string("sk")},
+ };
+ ASSERT_OK_AND_ASSIGN(options, S3Options::FromUriAndOptions("s3://", kv,
&path));
+ ASSERT_EQ(options.GetAccessKey(), "ak");
+ ASSERT_EQ(options.GetSecretKey(), "sk");
+ ASSERT_EQ(options.GetSessionToken(), "");
+
+ kv.push_back({"session_token", std::string("tok")});
+ ASSERT_OK_AND_ASSIGN(options, S3Options::FromUriAndOptions("s3://", kv,
&path));
+ ASSERT_EQ(options.GetAccessKey(), "ak");
+ ASSERT_EQ(options.GetSecretKey(), "sk");
+ ASSERT_EQ(options.GetSessionToken(), "tok");
+
+ // Failure scenarios
+ // Pairing is incorrect
+ ASSERT_THAT(
+ S3Options::FromUriAndOptions("s3://", {{"access_key",
std::string("ak")}}, &path),
+ Raises(StatusCode::Invalid, ::testing::HasSubstr("must be provided
together")));
+ ASSERT_THAT(
+ S3Options::FromUriAndOptions("s3://", {{"session_token",
std::string("tok")}},
+ &path),
+ Raises(StatusCode::Invalid, ::testing::HasSubstr("session_token'
requires")));
+ // unknown option key
+ ASSERT_THAT(S3Options::FromUriAndOptions("s3://", {{"bogus",
std::string("x")}}, &path),
+ Raises(StatusCode::Invalid, ::testing::HasSubstr("Unexpected
option")));
+ // const char* is rejected. Options must be an explicit std::string
+ ASSERT_THAT(S3Options::FromUriAndOptions("s3://", {{"access_key", "42"}},
&path),
+ Raises(StatusCode::Invalid, ::testing::HasSubstr("wrong type")));
+}
+
+TEST_F(S3OptionsTest, FromUriAndOptionsCredentialConflict) {
+ std::string path;
+ S3Options options;
+ FileSystemFactoryOptions kv{
+ {"access_key", std::string("opt_access_key")},
+ {"secret_key", std::string("opt_secret_key")},
+ };
+
+ ASSERT_OK_AND_ASSIGN(
+ options, S3Options::FromUriAndOptions("s3://mybucket?region=us-east-1",
kv, &path));
+ ASSERT_EQ(options.GetAccessKey(), "opt_access_key");
+ ASSERT_EQ(options.GetSecretKey(), "opt_secret_key");
+
+ ASSERT_OK_AND_ASSIGN(
+ options,
+ S3Options::FromUriAndOptions(
+ "s3://uri_access_key:uri_secret_key@mybucket?region=us-east-1", {},
&path));
+ ASSERT_EQ(options.GetAccessKey(), "uri_access_key");
+ ASSERT_EQ(options.GetSecretKey(), "uri_secret_key");
+
+ // Providing credentials both in the URI and in the options is Invalid.
+ ASSERT_THAT(
+ S3Options::FromUriAndOptions(
+ "s3://uri_access_key:uri_secret_key@mybucket?region=us-east-1", kv,
&path),
+ Raises(StatusCode::Invalid, ::testing::HasSubstr("provided both")));
+}
+
+TEST_F(S3OptionsTest, FromUriAndOptionsRetryStrategy) {
+ std::string path;
+ S3Options options;
+ auto retry = S3RetryStrategy::GetAwsDefaultRetryStrategy(/*max_attempts=*/3);
+ FileSystemFactoryOptions kv{{"retry_strategy", retry}};
+ ASSERT_OK_AND_ASSIGN(options, S3Options::FromUriAndOptions("s3://", kv,
&path));
+ // The exact typed object crossed through std::any unchanged.
+ ASSERT_EQ(options.retry_strategy, retry);
+
+ // wrong type is rejected, same as the credential keys
+ ASSERT_THAT(S3Options::FromUriAndOptions("s3://",
+ {{"retry_strategy",
std::string("x")}}, &path),
+ Raises(StatusCode::Invalid, ::testing::HasSubstr("wrong type")));
+}
+
+TEST_F(S3OptionsTest, FromUriAndOptionsDefaultMetadata) {
+ std::string path;
+ S3Options options;
+ auto metadata = KeyValueMetadata::Make({"Content-Type"}, {"x-arrow/test"});
+ FileSystemFactoryOptions kv{{"default_metadata", metadata}};
+ ASSERT_OK_AND_ASSIGN(options, S3Options::FromUriAndOptions("s3://", kv,
&path));
+ ASSERT_EQ(options.default_metadata, metadata);
+
+ ASSERT_THAT(S3Options::FromUriAndOptions(
+ "s3://", {{"default_metadata", std::string("x")}}, &path),
+ Raises(StatusCode::Invalid, ::testing::HasSubstr("wrong type")));
+}
+
TEST_F(S3OptionsTest, FromAssumeRole) {
S3Options options;
diff --git a/cpp/src/arrow/filesystem/util_internal.h
b/cpp/src/arrow/filesystem/util_internal.h
index 62e253ebc1..220640b657 100644
--- a/cpp/src/arrow/filesystem/util_internal.h
+++ b/cpp/src/arrow/filesystem/util_internal.h
@@ -105,4 +105,20 @@ extern FileSystemGlobalOptions global_options;
ARROW_EXPORT
Status UnregisterFileSystemFactory(const std::string& scheme);
+template <typename T>
+Result<T> GetOption(const std::string& key, const std::any& value) {
+ if (const auto* v = std::any_cast<T>(&value)) {
+ return *v;
+ }
+ return Status::Invalid("Filesystem option '", key, "' has the wrong type");
+}
+
+template <typename T>
+Result<std::shared_ptr<const T>> GetConstSharedPtrOption(const std::string&
key,
+ const std::any&
value) {
+ if (const auto* v = std::any_cast<std::shared_ptr<const T>>(&value)) return
*v;
+ if (const auto* v = std::any_cast<std::shared_ptr<T>>(&value)) return *v;
+ return Status::Invalid("Filesystem option '", key, "' has the wrong type");
+}
+
} // namespace arrow::fs::internal
diff --git a/cpp/src/arrow/testing/examplefs.cc
b/cpp/src/arrow/testing/examplefs.cc
index 5c9d5f9d90..24b1d2eab9 100644
--- a/cpp/src/arrow/testing/examplefs.cc
+++ b/cpp/src/arrow/testing/examplefs.cc
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+#include <any>
+
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/filesystem_library.h"
#include "arrow/result.h"
+#include "arrow/testing/examplefs.h"
#include "arrow/util/uri.h"
#include <gtest/gtest.h>
@@ -26,12 +29,41 @@ namespace arrow::fs {
auto kExampleFileSystemModule = ARROW_REGISTER_FILESYSTEM(
"example",
- [](const Uri& uri, const io::IOContext& io_context,
+ [](const Uri& uri, const FileSystemFactoryOptions& options,
+ const io::IOContext& io_context,
std::string* out_path) -> Result<std::shared_ptr<FileSystem>> {
constexpr std::string_view kScheme = "example";
EXPECT_EQ(uri.scheme(), kScheme);
auto local_uri = "file" + uri.ToString().substr(kScheme.size());
- return FileSystemFromUri(local_uri, io_context, out_path);
+ ARROW_ASSIGN_OR_RAISE(auto fs, FileSystemFromUri(local_uri, io_context,
out_path));
+ for (const auto& [key, value] : options) {
+ EXPECT_TRUE(value.has_value());
+ if (key == "example_option_string") {
+ if (const auto* s = std::any_cast<std::string>(&value)) {
+ if (out_path != nullptr) *out_path += "/" + *s;
+ } else {
+ ADD_FAILURE() << "example_option_string has wrong type";
+ }
+ } else if (key == "example_option_int") {
+ if (const auto* i = std::any_cast<int>(&value)) {
+ if (out_path != nullptr) *out_path += "/" + std::to_string(*i);
+ } else {
+ ADD_FAILURE() << "example_option_int has wrong type";
+ }
+ } else if (key == "example_typed_option") {
+ if (const auto* opt =
+ std::any_cast<std::shared_ptr<ExampleTypedOption>>(&value)) {
+ if (out_path != nullptr) {
+ *out_path += "/" + std::to_string((*opt)->value());
+ }
+ } else {
+ ADD_FAILURE() << "example_typed_option has wrong type";
+ }
+ } else {
+ ADD_FAILURE() << "Unexpected option: " << key;
+ }
+ }
+ return fs;
},
{});
diff --git a/cpp/src/arrow/testing/examplefs.cc
b/cpp/src/arrow/testing/examplefs.h
similarity index 58%
copy from cpp/src/arrow/testing/examplefs.cc
copy to cpp/src/arrow/testing/examplefs.h
index 5c9d5f9d90..e04b84c982 100644
--- a/cpp/src/arrow/testing/examplefs.cc
+++ b/cpp/src/arrow/testing/examplefs.h
@@ -15,24 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-#include "arrow/filesystem/filesystem.h"
-#include "arrow/filesystem/filesystem_library.h"
-#include "arrow/result.h"
-#include "arrow/util/uri.h"
-
-#include <gtest/gtest.h>
+#pragma once
namespace arrow::fs {
-auto kExampleFileSystemModule = ARROW_REGISTER_FILESYSTEM(
- "example",
- [](const Uri& uri, const io::IOContext& io_context,
- std::string* out_path) -> Result<std::shared_ptr<FileSystem>> {
- constexpr std::string_view kScheme = "example";
- EXPECT_EQ(uri.scheme(), kScheme);
- auto local_uri = "file" + uri.ToString().substr(kScheme.size());
- return FileSystemFromUri(local_uri, io_context, out_path);
- },
- {});
+class ExampleTypedOption {
+ public:
+ virtual ~ExampleTypedOption() = default;
+ virtual int value() const = 0;
+};
} // namespace arrow::fs