adamdebreceni commented on a change in pull request #975:
URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r560987684



##########
File path: extensions/aws/s3/S3Wrapper.cpp
##########
@@ -30,46 +37,253 @@ namespace minifi {
 namespace aws {
 namespace s3 {
 
-minifi::utils::optional<Aws::S3::Model::PutObjectResult> 
S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& 
request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  auto outcome = s3_client.PutObject(request);
+void HeadObjectResult::setFilePaths(const std::string& key) {
+  absolute_path = key;
+  std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, 
true /*force_posix*/);
+}
+
+S3Wrapper::S3Wrapper() : 
request_sender_(minifi::utils::make_unique<S3ClientRequestSender>()) {
+}
+
+S3Wrapper::S3Wrapper(std::unique_ptr<S3RequestSender> request_sender) : 
request_sender_(std::move(request_sender)) {
+}
+
+void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) {
+  request_sender_->setCredentials(cred);
+}
+
+void S3Wrapper::setRegion(const Aws::String& region) {
+  request_sender_->setRegion(region);
+}
+
+void S3Wrapper::setTimeout(uint64_t timeout) {
+  request_sender_->setTimeout(timeout);
+}
+
+void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) {
+  request_sender_->setEndpointOverrideUrl(url);
+}
+
+void S3Wrapper::setProxy(const ProxyOptions& proxy) {
+  request_sender_->setProxy(proxy);
+}
+
+void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const 
std::string& canned_acl) const {
+  if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == 
CANNED_ACL_MAP.end())
+    return;
+
+  logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+  request.SetACL(CANNED_ACL_MAP.at(canned_acl));
+}
+
+Expiration S3Wrapper::getExpiration(const std::string& expiration) {
+  minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\"");
+  const auto match = expr.match(expiration);
+  const auto& results = expr.getResult();
+  if (!match || results.size() < 3)
+    return Expiration{};
+  return Expiration{results[1], results[2]};
+}
+
+std::string 
S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) 
{
+  if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) {
+    return "";
+  }
+
+  auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), 
SERVER_SIDE_ENCRYPTION_MAP.end(),
+    [&](const std::pair<std::string, const 
Aws::S3::Model::ServerSideEncryption&> pair) {
+      return pair.second == encryption;
+    });
+  if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) {
+    return it->first;
+  }
+  return "";
+}
+
+minifi::utils::optional<PutObjectResult> S3Wrapper::putObject(const 
PutObjectRequestParameters& put_object_params, std::shared_ptr<Aws::IOStream> 
data_stream) {
+  Aws::S3::Model::PutObjectRequest request;
+  request.SetBucket(put_object_params.bucket);
+  request.SetKey(put_object_params.object_key);
+  
request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
+  
request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
+  request.SetContentType(put_object_params.content_type);
+  request.SetMetadata(put_object_params.user_metadata_map);
+  request.SetBody(data_stream);
+  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
+  request.SetGrantRead(put_object_params.read_permission_user_list);
+  request.SetGrantReadACP(put_object_params.read_acl_user_list);
+  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
+  setCannedAcl(request, put_object_params.canned_acl);
+
+  auto aws_result = request_sender_->sendPutObjectRequest(request);
+  if (!aws_result) {
+    return minifi::utils::nullopt;
+  }
+
+  PutObjectResult result;
+  // Etags are returned by AWS in quoted form that should be removed
+  result.etag = 
minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
+  result.version = aws_result->GetVersionId();
+
+  // GetExpiration returns a string pair with a date and a ruleid in 
'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+  // s3.expiration only needs the date member of this pair
+  result.expiration = 
getExpiration(aws_result->GetExpiration()).expiration_time;
+  result.ssealgorithm = 
getEncryptionString(aws_result->GetServerSideEncryption());
+  return result;
+}
+
+bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& 
object_key, const std::string& version) {
+  Aws::S3::Model::DeleteObjectRequest request;
+  request.SetBucket(bucket);
+  request.SetKey(object_key);
+  if (!version.empty()) {
+    request.SetVersionId(version);
+  }
+  return request_sender_->sendDeleteObjectRequest(request);
+}
+
+int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t 
data_size, const std::shared_ptr<io::BaseStream>& output) {
+  static const uint64_t BUFFER_SIZE = 4096;
+  std::vector<uint8_t> buffer;
+  buffer.reserve(BUFFER_SIZE);
 
-  if (outcome.IsSuccess()) {
-      logger_->log_info("Added S3 object '%s' to bucket '%s'", 
request.GetKey(), request.GetBucket());
-      return outcome.GetResultWithOwnership();
-  } else {
-    logger_->log_error("PutS3Object failed with the following: '%s'", 
outcome.GetError().GetMessage());
+  int64_t write_size = 0;
+  while (write_size < data_size) {
+    auto next_write_size = data_size - write_size < BUFFER_SIZE ? data_size - 
write_size : BUFFER_SIZE;
+    if (!source.read(reinterpret_cast<char*>(buffer.data()), next_write_size)) 
{
+      return -1;
+    }
+    auto ret = output->write(buffer.data(), next_write_size);
+    if (ret < 0) {
+      return ret;
+    }
+    write_size += next_write_size;
+  }
+  return write_size;
+}
+
+minifi::utils::optional<GetObjectResult> S3Wrapper::getObject(const 
GetObjectRequestParameters& get_object_params, const 
std::shared_ptr<io::BaseStream>& out_body) {
+  auto request = 
createFetchObjectRequest<Aws::S3::Model::GetObjectRequest>(get_object_params);
+  auto aws_result = request_sender_->sendGetObjectRequest(request);
+  if (!aws_result) {
     return minifi::utils::nullopt;
   }
+  auto result = fillFetchObjectResult<Aws::S3::Model::GetObjectResult, 
GetObjectResult>(get_object_params, aws_result.value());
+  result.write_size = writeFetchedBody(aws_result->GetBody(), 
aws_result->GetContentLength(), out_body);
+  return result;
+}
+
+void S3Wrapper::addListResults(const 
Aws::Vector<Aws::S3::Model::ObjectVersion>& content, const uint64_t 
min_object_age, std::vector<ListedObjectAttributes>& listed_objects) {
+  for (const auto& version : content) {
+    if (last_bucket_list_timestamp_ - min_object_age < 
version.GetLastModified().Millis()) {
+      logger_->log_debug("Object version '%s' of key '%s' skipped due to 
minimum object age filter", version.GetVersionId(), version.GetKey());
+      continue;
+    }
+
+    ListedObjectAttributes attributes;
+    attributes.etag = 
minifi::utils::StringUtils::removeFramingCharacters(version.GetETag(), '"');
+    attributes.filename = version.GetKey();
+    attributes.is_latest = version.GetIsLatest();
+    attributes.last_modified = version.GetLastModified().Millis();
+    attributes.length = version.GetSize();
+    attributes.store_class = 
VERSION_STORAGE_CLASS_MAP.at(version.GetStorageClass());
+    attributes.version = version.GetVersionId();
+    listed_objects.push_back(attributes);
+  }
 }
 
-bool S3Wrapper::sendDeleteObjectRequest(const 
Aws::S3::Model::DeleteObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  Aws::S3::Model::DeleteObjectOutcome outcome = 
s3_client.DeleteObject(request);
+void S3Wrapper::addListResults(const Aws::Vector<Aws::S3::Model::Object>& 
content, const uint64_t min_object_age, std::vector<ListedObjectAttributes>& 
listed_objects) {
+  for (const auto& object : content) {
+    if (last_bucket_list_timestamp_ - min_object_age < 
object.GetLastModified().Millis()) {
+      logger_->log_debug("Object with key '%s' skipped due to minimum object 
age filter", object.GetKey());
+      continue;
+    }
 
-  if (outcome.IsSuccess()) {
-    logger_->log_info("Deleted S3 object '%s' from bucket '%s'", 
request.GetKey(), request.GetBucket());
-    return true;
-  } else if (outcome.GetError().GetErrorType() == 
Aws::S3::S3Errors::NO_SUCH_KEY) {
-    logger_->log_info("S3 object '%s' was not found in bucket '%s'", 
request.GetKey(), request.GetBucket());
-    return true;
-  } else {
-    logger_->log_error("DeleteS3Object failed with the following: '%s'", 
outcome.GetError().GetMessage());
-    return false;
+    ListedObjectAttributes attributes;
+    attributes.etag = 
minifi::utils::StringUtils::removeFramingCharacters(object.GetETag(), '"');
+    attributes.filename = object.GetKey();
+    attributes.is_latest = true;
+    attributes.last_modified = object.GetLastModified().Millis();
+    attributes.length = object.GetSize();
+    attributes.store_class = 
OBJECT_STORAGE_CLASS_MAP.at(object.GetStorageClass());
+    listed_objects.push_back(attributes);
   }
 }
 
-minifi::utils::optional<Aws::S3::Model::GetObjectResult> 
S3Wrapper::sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& 
request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  auto outcome = s3_client.GetObject(request);
+minifi::utils::optional<std::vector<ListedObjectAttributes>> 
S3Wrapper::listVersions(const ListRequestParameters& params) {
+  auto request = 
createListRequest<Aws::S3::Model::ListObjectVersionsRequest>(params);
+  std::vector<ListedObjectAttributes> attribute_list;
+  nonstd::optional_lite::optional<Aws::S3::Model::ListObjectVersionsResult> 
aws_result;
+  do {
+    aws_result = request_sender_->sendListVersionsRequest(request);
+    if (!aws_result) {
+      return minifi::utils::nullopt;
+    }
+    const auto& versions = aws_result->GetVersions();
+    logger_->log_debug("AWS S3 List operation returned %zu versions. This 
result is truncated: %s", versions.size(), aws_result->GetIsTruncated() ? 
"true" : "false");
+    addListResults(versions, params.min_object_age, attribute_list);
+    if (aws_result->GetIsTruncated()) {
+      request.SetKeyMarker(aws_result->GetNextKeyMarker());
+      request.SetVersionIdMarker(aws_result->GetNextVersionIdMarker());
+    }
+  } while (aws_result->GetIsTruncated());
+
+  return attribute_list;
+}
+
+minifi::utils::optional<std::vector<ListedObjectAttributes>> 
S3Wrapper::listObjects(const ListRequestParameters& params) {
+  auto request = 
createListRequest<Aws::S3::Model::ListObjectsV2Request>(params);
+  std::vector<ListedObjectAttributes> attribute_list;
+  nonstd::optional_lite::optional<Aws::S3::Model::ListObjectsV2Result> 
aws_result;
+  do {
+    aws_result = request_sender_->sendListObjectsRequest(request);
+    if (!aws_result) {
+      return minifi::utils::nullopt;
+    }
+    const auto& objects = aws_result->GetContents();
+    logger_->log_debug("AWS S3 List operation returned %d objects. This result 
is truncated: %s", objects.size(), aws_result->GetIsTruncated() ? "true" : 
"false");

Review comment:
       the `%d`  might not be the proper format specifier




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to