wgtmac commented on code in PR #548:
URL: https://github.com/apache/iceberg-cpp/pull/548#discussion_r2781430013


##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -144,6 +145,40 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
                       std::move(endpoints)));
 }
 
+Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
+    const RestCatalogProperties& config) {
+  // Get warehouse location to determine the appropriate FileIO type
+  auto warehouse = config.Get(RestCatalogProperties::kWarehouse);
+  if (warehouse.empty()) {
+    return InvalidArgument(
+        "Warehouse location is required when FileIO is not explicitly 
provided. "
+        "Set the 'warehouse' property to an S3 URI (s3://...) or local path.");
+  }
+
+  // Check for user-specified io-impl property
+  auto io_impl = config.configs().find(FileIOProperties::kImpl);
+  std::string impl_name;
+
+  if (io_impl != config.configs().end() && !io_impl->second.empty()) {
+    // User specified a custom io-impl
+    impl_name = io_impl->second;
+  } else {
+    // Use default based on warehouse URI scheme
+    if (warehouse.rfind("s3://", 0) == 0) {

Review Comment:
   Why using `rfind`?



##########
cmake_modules/IcebergThirdpartyToolchain.cmake:
##########
@@ -87,6 +87,7 @@ function(resolve_arrow_dependency)
   # Work around undefined symbol: 
arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*)
   set(ARROW_IPC ON)
   set(ARROW_FILESYSTEM ON)
+  set(ARROW_S3 ON)

Review Comment:
   Can we add a cmake option `ICEBERG_S3` and only toggle on `ARROW_S3` when 
`ICEBERG_S3` is on?



##########
src/iceberg/catalog/rest/rest_catalog.h:
##########
@@ -53,6 +53,30 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
   static Result<std::shared_ptr<RestCatalog>> Make(const 
RestCatalogProperties& config,
                                                    std::shared_ptr<FileIO> 
file_io);
 
+  /// \brief Create a RestCatalog instance with auto-detected FileIO.
+  ///
+  /// This overload automatically creates an appropriate FileIO based on the 
"io-impl"
+  /// property or the warehouse location URI scheme.
+  ///
+  /// FileIO selection logic:

Review Comment:
   It is better to add a `iceberg/util/file_io_util.h` to handle this logic and 
support reusing. Please note that Arrow Filesystem support is only available in 
the iceberg-bundle library, so we can only talk to the FileIO registry to 
create an FileIO instance.



##########
src/iceberg/arrow/arrow_s3_file_io.cc:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <mutex>
+#include <string_view>
+
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/localfs.h>
+#if __has_include(<arrow/filesystem/s3fs.h>)

Review Comment:
   If we add `ICEBERG_S3` option, we don't need to deal with this check.



##########
src/iceberg/arrow/arrow_s3_file_io.cc:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <mutex>
+#include <string_view>
+
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/localfs.h>
+#if __has_include(<arrow/filesystem/s3fs.h>)
+#include <arrow/filesystem/s3fs.h>
+#define ICEBERG_ARROW_HAS_S3 1
+#else
+#define ICEBERG_ARROW_HAS_S3 0
+#endif
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/arrow/s3_properties.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::arrow {
+
+namespace {
+
+bool IsS3Uri(std::string_view uri) { return uri.rfind("s3://", 0) == 0; }
+
+Status EnsureS3Initialized() {
+#if ICEBERG_ARROW_HAS_S3
+  static std::once_flag init_flag;
+  static ::arrow::Status init_status = ::arrow::Status::OK();
+  std::call_once(init_flag, []() {
+    ::arrow::fs::S3GlobalOptions options;
+    init_status = ::arrow::fs::InitializeS3(options);
+    if (init_status.ok()) {
+      std::atexit([]() { (void)::arrow::fs::FinalizeS3(); });
+    }
+  });
+  if (!init_status.ok()) {
+    return std::unexpected<Error>{
+        {.kind = ::iceberg::arrow::ToErrorKind(init_status),
+         .message = init_status.ToString()}};
+  }
+  return {};
+#else
+  return NotImplemented("Arrow S3 support is not enabled");
+#endif
+}
+
+#if ICEBERG_ARROW_HAS_S3
+/// \brief Configure S3Options from a properties map.
+///
+/// \param properties The configuration properties map.
+/// \return Configured S3Options.
+::arrow::fs::S3Options ConfigureS3Options(
+    const std::unordered_map<std::string, std::string>& properties) {
+  ::arrow::fs::S3Options options;
+
+  // Configure credentials
+  auto access_key_it = properties.find(S3Properties::kAccessKeyId);

Review Comment:
   What is `S3Properties` defined?



##########
src/iceberg/arrow/arrow_s3_file_io.cc:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <mutex>
+#include <string_view>
+
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/localfs.h>
+#if __has_include(<arrow/filesystem/s3fs.h>)
+#include <arrow/filesystem/s3fs.h>
+#define ICEBERG_ARROW_HAS_S3 1
+#else
+#define ICEBERG_ARROW_HAS_S3 0
+#endif
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/arrow/s3_properties.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::arrow {
+
+namespace {
+
+bool IsS3Uri(std::string_view uri) { return uri.rfind("s3://", 0) == 0; }
+
+Status EnsureS3Initialized() {
+#if ICEBERG_ARROW_HAS_S3
+  static std::once_flag init_flag;
+  static ::arrow::Status init_status = ::arrow::Status::OK();
+  std::call_once(init_flag, []() {
+    ::arrow::fs::S3GlobalOptions options;
+    init_status = ::arrow::fs::InitializeS3(options);
+    if (init_status.ok()) {
+      std::atexit([]() { (void)::arrow::fs::FinalizeS3(); });
+    }
+  });
+  if (!init_status.ok()) {
+    return std::unexpected<Error>{
+        {.kind = ::iceberg::arrow::ToErrorKind(init_status),
+         .message = init_status.ToString()}};
+  }
+  return {};
+#else
+  return NotImplemented("Arrow S3 support is not enabled");
+#endif
+}
+
+#if ICEBERG_ARROW_HAS_S3
+/// \brief Configure S3Options from a properties map.
+///
+/// \param properties The configuration properties map.
+/// \return Configured S3Options.
+::arrow::fs::S3Options ConfigureS3Options(

Review Comment:
   I agree this is something that we need.



##########
src/iceberg/arrow/arrow_s3_file_io.cc:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <mutex>
+#include <string_view>
+
+#include <arrow/filesystem/filesystem.h>
+#include <arrow/filesystem/localfs.h>
+#if __has_include(<arrow/filesystem/s3fs.h>)
+#include <arrow/filesystem/s3fs.h>
+#define ICEBERG_ARROW_HAS_S3 1
+#else
+#define ICEBERG_ARROW_HAS_S3 0
+#endif
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/arrow/s3_properties.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::arrow {
+
+namespace {
+
+bool IsS3Uri(std::string_view uri) { return uri.rfind("s3://", 0) == 0; }
+
+Status EnsureS3Initialized() {
+#if ICEBERG_ARROW_HAS_S3
+  static std::once_flag init_flag;
+  static ::arrow::Status init_status = ::arrow::Status::OK();
+  std::call_once(init_flag, []() {
+    ::arrow::fs::S3GlobalOptions options;
+    init_status = ::arrow::fs::InitializeS3(options);
+    if (init_status.ok()) {
+      std::atexit([]() { (void)::arrow::fs::FinalizeS3(); });
+    }
+  });
+  if (!init_status.ok()) {
+    return std::unexpected<Error>{
+        {.kind = ::iceberg::arrow::ToErrorKind(init_status),
+         .message = init_status.ToString()}};
+  }
+  return {};
+#else
+  return NotImplemented("Arrow S3 support is not enabled");
+#endif
+}
+
+#if ICEBERG_ARROW_HAS_S3
+/// \brief Configure S3Options from a properties map.
+///
+/// \param properties The configuration properties map.
+/// \return Configured S3Options.
+::arrow::fs::S3Options ConfigureS3Options(
+    const std::unordered_map<std::string, std::string>& properties) {
+  ::arrow::fs::S3Options options;
+
+  // Configure credentials
+  auto access_key_it = properties.find(S3Properties::kAccessKeyId);
+  auto secret_key_it = properties.find(S3Properties::kSecretAccessKey);
+  auto session_token_it = properties.find(S3Properties::kSessionToken);
+
+  if (access_key_it != properties.end() && secret_key_it != properties.end()) {
+    if (session_token_it != properties.end()) {
+      options.ConfigureAccessKey(access_key_it->second, secret_key_it->second,
+                                 session_token_it->second);
+    } else {
+      options.ConfigureAccessKey(access_key_it->second, secret_key_it->second);
+    }
+  } else {
+    // Use default credential chain (environment, instance profile, etc.)
+    options.ConfigureDefaultCredentials();
+  }
+
+  // Configure region
+  auto region_it = properties.find(S3Properties::kRegion);
+  if (region_it != properties.end()) {
+    options.region = region_it->second;
+  }
+
+  // Configure endpoint (for MinIO, LocalStack, etc.)
+  auto endpoint_it = properties.find(S3Properties::kEndpoint);
+  if (endpoint_it != properties.end()) {
+    options.endpoint_override = endpoint_it->second;
+  }
+
+  // Configure path-style access (needed for MinIO)
+  auto path_style_it = properties.find(S3Properties::kPathStyleAccess);
+  if (path_style_it != properties.end()) {
+    // Arrow's S3 path-style is controlled via endpoint scheme
+    // For path-style access, we need to ensure the endpoint is properly 
configured
+  }
+
+  // Configure SSL
+  auto ssl_it = properties.find(S3Properties::kSslEnabled);
+  if (ssl_it != properties.end() && ssl_it->second == "false") {
+    options.scheme = "http";
+  }
+
+  // Configure timeouts
+  auto connect_timeout_it = properties.find(S3Properties::kConnectTimeoutMs);
+  if (connect_timeout_it != properties.end()) {
+    options.connect_timeout = std::stod(connect_timeout_it->second) / 1000.0;
+  }
+
+  auto socket_timeout_it = properties.find(S3Properties::kSocketTimeoutMs);
+  if (socket_timeout_it != properties.end()) {
+    options.request_timeout = std::stod(socket_timeout_it->second) / 1000.0;
+  }
+
+  return options;
+}
+
+/// \brief Create an S3 FileSystem with the given options.
+///
+/// \param options The S3Options to use.
+/// \return A shared_ptr to the S3FileSystem, or an error.
+Result<std::shared_ptr<::arrow::fs::FileSystem>> MakeS3FileSystem(
+    const ::arrow::fs::S3Options& options) {
+  ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, 
::arrow::fs::S3FileSystem::Make(options));
+  return fs;
+}
+#endif
+
+Result<std::shared_ptr<::arrow::fs::FileSystem>> ResolveFileSystemFromUri(
+    const std::string& uri, std::string* out_path) {
+  if (IsS3Uri(uri)) {
+    ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());
+  }
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::FileSystemFromUri(uri, 
out_path));
+  return fs;
+}
+
+/// \brief ArrowUriFileIO resolves FileSystem from URI for each operation.
+///
+/// This implementation is thread-safe as it creates a new FileSystem instance
+/// for each operation. However, it may be less efficient than caching the
+/// FileSystem. S3 initialization is done once per process.
+class ArrowUriFileIO : public FileIO {

Review Comment:
   Why do we need this instead of reusing `ArrowFIleSystemFileIO`?



##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -144,6 +145,40 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
                       std::move(endpoints)));
 }
 
+Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
+    const RestCatalogProperties& config) {
+  // Get warehouse location to determine the appropriate FileIO type
+  auto warehouse = config.Get(RestCatalogProperties::kWarehouse);
+  if (warehouse.empty()) {
+    return InvalidArgument(
+        "Warehouse location is required when FileIO is not explicitly 
provided. "
+        "Set the 'warehouse' property to an S3 URI (s3://...) or local path.");
+  }
+
+  // Check for user-specified io-impl property
+  auto io_impl = config.configs().find(FileIOProperties::kImpl);
+  std::string impl_name;
+
+  if (io_impl != config.configs().end() && !io_impl->second.empty()) {
+    // User specified a custom io-impl
+    impl_name = io_impl->second;
+  } else {
+    // Use default based on warehouse URI scheme
+    if (warehouse.rfind("s3://", 0) == 0) {

Review Comment:
   BTW, shouldn't we use `uri` instead of `warehouse` property?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to