adamdebreceni commented on a change in pull request #975:
URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r564325731



##########
File path: extensions/aws/s3/S3Wrapper.cpp
##########
@@ -30,46 +37,253 @@ namespace minifi {
 namespace aws {
 namespace s3 {
 
-minifi::utils::optional<Aws::S3::Model::PutObjectResult> 
S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& 
request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  auto outcome = s3_client.PutObject(request);
+void HeadObjectResult::setFilePaths(const std::string& key) {
+  absolute_path = key;
+  std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, 
true /*force_posix*/);
+}
+
+S3Wrapper::S3Wrapper() : 
request_sender_(minifi::utils::make_unique<S3ClientRequestSender>()) {
+}
+
+S3Wrapper::S3Wrapper(std::unique_ptr<S3RequestSender> request_sender) : 
request_sender_(std::move(request_sender)) {
+}
+
+void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) {
+  request_sender_->setCredentials(cred);
+}
+
+void S3Wrapper::setRegion(const Aws::String& region) {
+  request_sender_->setRegion(region);
+}
+
+void S3Wrapper::setTimeout(uint64_t timeout) {
+  request_sender_->setTimeout(timeout);
+}
+
+void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) {
+  request_sender_->setEndpointOverrideUrl(url);
+}
+
+void S3Wrapper::setProxy(const ProxyOptions& proxy) {
+  request_sender_->setProxy(proxy);
+}
+
+void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const 
std::string& canned_acl) const {
+  if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == 
CANNED_ACL_MAP.end())
+    return;
+
+  logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+  request.SetACL(CANNED_ACL_MAP.at(canned_acl));
+}
+
+Expiration S3Wrapper::getExpiration(const std::string& expiration) {
+  minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\"");
+  const auto match = expr.match(expiration);
+  const auto& results = expr.getResult();
+  if (!match || results.size() < 3)
+    return Expiration{};
+  return Expiration{results[1], results[2]};
+}
+
+std::string 
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) 
{
+  if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) {
+    return "";
+  }
+
+  auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), 
SERVER_SIDE_ENCRYPTION_MAP.end(),
+    [&](const std::pair<std::string, const 
Aws::S3::Model::ServerSideEncryption&> pair) {
+      return pair.second == encryption;
+    });
+  if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) {
+    return it->first;
+  }
+  return "";
+}
+
+minifi::utils::optional<PutObjectResult> S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, std::shared_ptr<Aws::IOStream> 
data_stream) {
+  Aws::S3::Model::PutObjectRequest request;
+  request.SetBucket(put_object_params.bucket);
+  request.SetKey(put_object_params.object_key);
+  
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
+  
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
+  request.SetContentType(put_object_params.content_type);
+  request.SetMetadata(put_object_params.user_metadata_map);
+  request.SetBody(data_stream);
+  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
+  request.SetGrantRead(put_object_params.read_permission_user_list);
+  request.SetGrantReadACP(put_object_params.read_acl_user_list);
+  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
+  setCannedAcl(request, put_object_params.canned_acl);
+
+  auto aws_result = request_sender_->sendPutObjectRequest(request);
+  if (!aws_result) {
+    return minifi::utils::nullopt;
+  }
+
+  PutObjectResult result;
+  // Etags are returned by AWS in quoted form that should be removed
+  result.etag = 
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
+  result.version = aws_result->GetVersionId();
+
+  // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+  // s3.expiration only needs the date member of this pair
+  result.expiration = 
getExpiration(aws_result->GetExpiration()).expiration_time;
+  result.ssealgorithm = 
getEncryptionString(aws_result->GetServerSideEncryption());
+  return result;
+}
+
+bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& 
object_key, const std::string& version) {
+  Aws::S3::Model::DeleteObjectRequest request;
+  request.SetBucket(bucket);
+  request.SetKey(object_key);
+  if (!version.empty()) {
+    request.SetVersionId(version);
+  }
+  return request_sender_->sendDeleteObjectRequest(request);
+}
+
+int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t 
data_size, const std::shared_ptr<io::BaseStream>& output) {
+  static const uint64_t BUFFER_SIZE = 4096;
+  std::vector<uint8_t> buffer;
+  buffer.reserve(BUFFER_SIZE);
 
-  if (outcome.IsSuccess()) {
-      logger_->log_info("Added S3 object '%s' to bucket '%s'", 
request.GetKey(), request.GetBucket());
-      return outcome.GetResultWithOwnership();
-  } else {
-    logger_->log_error("PutS3Object failed with the following: '%s'", 
outcome.GetError().GetMessage());
+  int64_t write_size = 0;
+  while (write_size < data_size) {
+    auto next_write_size = data_size - write_size < BUFFER_SIZE ? data_size - 
write_size : BUFFER_SIZE;
+    if (!source.read(reinterpret_cast<char*>(buffer.data()), next_write_size)) 
{

Review comment:
       I see now that `readsome` would not be a wise choice here, I just wanted 
to avoid passing in the body length, if we read until `eof` anyway, but I see 
no indication that the two (reading until body length and reading until `eof`) 
are equivalent 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to