hunyadi-dev commented on a change in pull request #979: URL: https://github.com/apache/nifi-minifi-cpp/pull/979#discussion_r584728589
########## File path: extensions/azure/processors/PutAzureBlobStorage.cpp ########## @@ -0,0 +1,229 @@ +/** + * @file PutAzureBlobStorage.cpp + * PutAzureBlobStorage class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutAzureBlobStorage.h" + +#include <memory> +#include <string> + +#include "storage/AzureBlobStorage.h" +#include "controllerservices/AzureStorageCredentialsService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace azure { +namespace processors { + +const core::Property PutAzureBlobStorage::ContainerName( + core::PropertyBuilder::createProperty("Container Name") + ->withDescription("Name of the Azure storage container. In case of PutAzureBlobStorage processor, container can be created if it does not exist.") + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); +const core::Property PutAzureBlobStorage::AzureStorageCredentialsService( + core::PropertyBuilder::createProperty("Azure Storage Credentials Service") + ->withDescription("Name of the Azure Storage Credentials Service used to retrieve the connection string from.") + ->build()); +const core::Property PutAzureBlobStorage::StorageAccountName( + core::PropertyBuilder::createProperty("Storage Account Name") + ->withDescription("The storage account name.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureBlobStorage::StorageAccountKey( + core::PropertyBuilder::createProperty("Storage Account Key") + ->withDescription("The storage account key. This is an admin-like password providing access to every container in this account. " + "It is recommended one uses Shared Access Signature (SAS) token instead for fine-grained control with policies.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureBlobStorage::SASToken( + core::PropertyBuilder::createProperty("SAS Token") + ->withDescription("Shared Access Signature token. Specify either SAS Token (recommended) or Account Key.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureBlobStorage::CommonStorageAccountEndpointSuffix( + core::PropertyBuilder::createProperty("Common Storage Account Endpoint Suffix") + ->withDescription("Storage accounts in public Azure always use a common FQDN suffix. Override this endpoint suffix with a " + "different suffix in certain circumstances (like Azure Stack or non-public Azure regions). ") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureBlobStorage::ConnectionString( + core::PropertyBuilder::createProperty("Connection String") + ->withDescription("Connection string used to connect to Azure Storage service. This overrides all other set credential properties.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureBlobStorage::Blob( + core::PropertyBuilder::createProperty("Blob") + ->withDescription("The filename of the blob.") + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); +const core::Property PutAzureBlobStorage::CreateContainer( + core::PropertyBuilder::createProperty("Create Container") + ->withDescription("Specifies whether to check if the container exists and to automatically create it if it does not. " + "Permission to list containers is required. If false, this check is not made, but the Put operation will " + "fail if the container does not exist.") + ->isRequired(true) + ->withDefaultValue<bool>(false) + ->build()); + +const core::Relationship PutAzureBlobStorage::Success("success", "All successfully processed FlowFiles are routed to this relationship"); +const core::Relationship PutAzureBlobStorage::Failure("failure", "Unsuccessful operations will be transferred to the failure relationship"); + +void PutAzureBlobStorage::initialize() { + // Set the supported properties + setSupportedProperties({ + ContainerName, + StorageAccountName, + StorageAccountKey, + SASToken, + CommonStorageAccountEndpointSuffix, + ConnectionString, + AzureStorageCredentialsService, + Blob, + CreateContainer + }); + // Set the supported relationships + setSupportedRelationships({ + Failure, + Success + }); +} + +void PutAzureBlobStorage::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { + context->getProperty(CreateContainer.getName(), create_container_); +} + +std::string PutAzureBlobStorage::getConnectionStringFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const { + std::string service_name; + if (!context->getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) { + return ""; + } + + std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(service_name); + if (nullptr == service) { + logger_->log_error("Azure storage credentials service with name: '%s' could not be found", service_name.c_str()); + return ""; + } + + auto azure_credentials_service = std::dynamic_pointer_cast<minifi::azure::controllers::AzureStorageCredentialsService>(service); + if (!azure_credentials_service) { + logger_->log_error("Controller service with name: '%s' is not an Azure storage credentials service", service_name.c_str()); + return ""; + } + + return azure_credentials_service->getConnectionString(); +} + +std::string PutAzureBlobStorage::getAzureConnectionStringFromProperties( + const std::shared_ptr<core::ProcessContext> &context, + const std::shared_ptr<core::FlowFile> &flow_file) const { + azure::storage::AzureStorageCredentials credentials; + context->getProperty(StorageAccountName, credentials.storage_account_name, flow_file); + context->getProperty(StorageAccountKey, credentials.storage_account_key, flow_file); + context->getProperty(SASToken, credentials.sas_token, flow_file); + context->getProperty(CommonStorageAccountEndpointSuffix, credentials.endpoint_suffix, flow_file); + context->getProperty(ConnectionString, credentials.connection_string, flow_file); + return credentials.getConnectionString(); +} + +void PutAzureBlobStorage::createAzureStorageClient(const std::string &connection_string, const std::string &container_name) { + // When used in multithreaded environment make sure to use the azure_storage_mutex_ to lock the wrapper so the + // client is not reset with different configuration while another thread is using it. + if (blob_storage_wrapper_ == nullptr) { + blob_storage_wrapper_ = minifi::utils::make_unique<storage::AzureBlobStorage>(connection_string, container_name); + } else { + blob_storage_wrapper_->resetClientIfNeeded(connection_string, container_name); + } +} + +std::string PutAzureBlobStorage::getConnectionString( + const std::shared_ptr<core::ProcessContext> &context, + const std::shared_ptr<core::FlowFile> &flow_file) const { + auto connection_string = getAzureConnectionStringFromProperties(context, flow_file); + if (!connection_string.empty()) { + return connection_string; + } + + return getConnectionStringFromControllerService(context); +} + +void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + logger_->log_debug("PutAzureBlobStorage onTrigger"); + std::shared_ptr<core::FlowFile> flow_file = session->get(); + if (!flow_file) { + return; + } + + auto connection_string = getConnectionString(context, flow_file); + if (connection_string.empty()) { + logger_->log_error("Connection string is empty!"); + session->transfer(flow_file, Failure); + return; + } + + std::string container_name; + if (!context->getProperty(ContainerName, container_name, flow_file) || container_name.empty()) { + logger_->log_error("Container Name is invalid or empty!"); + session->transfer(flow_file, Failure); + return; + } + + std::string blob_name; + if (!context->getProperty(Blob, blob_name, flow_file) || blob_name.empty()) { + logger_->log_error("Blob name is invalid or empty!"); + session->transfer(flow_file, Failure); + return; + } + + utils::optional<azure::storage::UploadBlobResult> upload_result; + { + std::lock_guard<std::mutex> lock(azure_storage_mutex_); + createAzureStorageClient(connection_string, container_name); + if (create_container_) { + blob_storage_wrapper_->createContainer(); + } + PutAzureBlobStorage::ReadCallback callback(flow_file->getSize(), *blob_storage_wrapper_, blob_name); + session->read(flow_file, &callback); + upload_result = callback.getResult(); + } + + if (!upload_result) { + logger_->log_error("Failed to upload blob '%s' to Azure storage container '%s'", blob_name, container_name); + session->transfer(flow_file, Failure); + } else { Review comment: Minor, but adding the scope could be omitted if the unhappy path returned both here and in `createAzureStorageClient`. ########## File path: extensions/azure/processors/PutAzureBlobStorage.cpp ########## @@ -0,0 +1,229 @@ +/** + * @file PutAzureBlobStorage.cpp + * PutAzureBlobStorage class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutAzureBlobStorage.h" + +#include <memory> +#include <string> + +#include "storage/AzureBlobStorage.h" +#include "controllerservices/AzureStorageCredentialsService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace azure { +namespace processors { + +const core::Property PutAzureBlobStorage::ContainerName( + core::PropertyBuilder::createProperty("Container Name") + ->withDescription("Name of the Azure storage container. In case of PutAzureBlobStorage processor, container can be created if it does not exist.") + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); +const core::Property PutAzureBlobStorage::AzureStorageCredentialsService( + core::PropertyBuilder::createProperty("Azure Storage Credentials Service") + ->withDescription("Name of the Azure Storage Credentials Service used to retrieve the connection string from.") + ->build()); +const core::Property PutAzureBlobStorage::StorageAccountName( + core::PropertyBuilder::createProperty("Storage Account Name") + ->withDescription("The storage account name.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureBlobStorage::StorageAccountKey( + core::PropertyBuilder::createProperty("Storage Account Key") + ->withDescription("The storage account key. This is an admin-like password providing access to every container in this account. " + "It is recommended one uses Shared Access Signature (SAS) token instead for fine-grained control with policies.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureBlobStorage::SASToken( + core::PropertyBuilder::createProperty("SAS Token") + ->withDescription("Shared Access Signature token. Specify either SAS Token (recommended) or Account Key.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureBlobStorage::CommonStorageAccountEndpointSuffix( + core::PropertyBuilder::createProperty("Common Storage Account Endpoint Suffix") + ->withDescription("Storage accounts in public Azure always use a common FQDN suffix. Override this endpoint suffix with a " + "different suffix in certain circumstances (like Azure Stack or non-public Azure regions). ") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureBlobStorage::ConnectionString( + core::PropertyBuilder::createProperty("Connection String") + ->withDescription("Connection string used to connect to Azure Storage service. This overrides all other set credential properties.") + ->supportsExpressionLanguage(true) + ->build()); +const core::Property PutAzureBlobStorage::Blob( + core::PropertyBuilder::createProperty("Blob") + ->withDescription("The filename of the blob.") + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); +const core::Property PutAzureBlobStorage::CreateContainer( + core::PropertyBuilder::createProperty("Create Container") + ->withDescription("Specifies whether to check if the container exists and to automatically create it if it does not. " + "Permission to list containers is required. If false, this check is not made, but the Put operation will " + "fail if the container does not exist.") + ->isRequired(true) + ->withDefaultValue<bool>(false) + ->build()); + +const core::Relationship PutAzureBlobStorage::Success("success", "All successfully processed FlowFiles are routed to this relationship"); +const core::Relationship PutAzureBlobStorage::Failure("failure", "Unsuccessful operations will be transferred to the failure relationship"); + +void PutAzureBlobStorage::initialize() { + // Set the supported properties + setSupportedProperties({ + ContainerName, + StorageAccountName, + StorageAccountKey, + SASToken, + CommonStorageAccountEndpointSuffix, + ConnectionString, + AzureStorageCredentialsService, + Blob, + CreateContainer + }); + // Set the supported relationships + setSupportedRelationships({ + Failure, + Success + }); +} + +void PutAzureBlobStorage::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { + context->getProperty(CreateContainer.getName(), create_container_); +} + +std::string PutAzureBlobStorage::getConnectionStringFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const { + std::string service_name; + if (!context->getProperty(AzureStorageCredentialsService.getName(), service_name) || service_name.empty()) { + return ""; + } + + std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(service_name); + if (nullptr == service) { + logger_->log_error("Azure storage credentials service with name: '%s' could not be found", service_name.c_str()); + return ""; + } + + auto azure_credentials_service = std::dynamic_pointer_cast<minifi::azure::controllers::AzureStorageCredentialsService>(service); + if (!azure_credentials_service) { + logger_->log_error("Controller service with name: '%s' is not an Azure storage credentials service", service_name.c_str()); + return ""; + } + + return azure_credentials_service->getConnectionString(); +} + +std::string PutAzureBlobStorage::getAzureConnectionStringFromProperties( + const std::shared_ptr<core::ProcessContext> &context, + const std::shared_ptr<core::FlowFile> &flow_file) const { + azure::storage::AzureStorageCredentials credentials; + context->getProperty(StorageAccountName, credentials.storage_account_name, flow_file); + context->getProperty(StorageAccountKey, credentials.storage_account_key, flow_file); + context->getProperty(SASToken, credentials.sas_token, flow_file); + context->getProperty(CommonStorageAccountEndpointSuffix, credentials.endpoint_suffix, flow_file); + context->getProperty(ConnectionString, credentials.connection_string, flow_file); + return credentials.getConnectionString(); +} + +void PutAzureBlobStorage::createAzureStorageClient(const std::string &connection_string, const std::string &container_name) { + // When used in multithreaded environment make sure to use the azure_storage_mutex_ to lock the wrapper so the + // client is not reset with different configuration while another thread is using it. + if (blob_storage_wrapper_ == nullptr) { + blob_storage_wrapper_ = minifi::utils::make_unique<storage::AzureBlobStorage>(connection_string, container_name); + } else { + blob_storage_wrapper_->resetClientIfNeeded(connection_string, container_name); + } +} + +std::string PutAzureBlobStorage::getConnectionString( + const std::shared_ptr<core::ProcessContext> &context, + const std::shared_ptr<core::FlowFile> &flow_file) const { + auto connection_string = getAzureConnectionStringFromProperties(context, flow_file); + if (!connection_string.empty()) { + return connection_string; + } + + return getConnectionStringFromControllerService(context); +} + +void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + logger_->log_debug("PutAzureBlobStorage onTrigger"); + std::shared_ptr<core::FlowFile> flow_file = session->get(); + if (!flow_file) { + return; + } + + auto connection_string = getConnectionString(context, flow_file); + if (connection_string.empty()) { + logger_->log_error("Connection string is empty!"); + session->transfer(flow_file, Failure); + return; + } + + std::string container_name; + if (!context->getProperty(ContainerName, container_name, flow_file) || container_name.empty()) { + logger_->log_error("Container Name is invalid or empty!"); + session->transfer(flow_file, Failure); + return; + } + + std::string blob_name; + if (!context->getProperty(Blob, blob_name, flow_file) || blob_name.empty()) { + logger_->log_error("Blob name is invalid or empty!"); + session->transfer(flow_file, Failure); + return; + } + + utils::optional<azure::storage::UploadBlobResult> upload_result; + { + std::lock_guard<std::mutex> lock(azure_storage_mutex_); + createAzureStorageClient(connection_string, container_name); + if (create_container_) { + blob_storage_wrapper_->createContainer(); + } + PutAzureBlobStorage::ReadCallback callback(flow_file->getSize(), *blob_storage_wrapper_, blob_name); + session->read(flow_file, &callback); + upload_result = callback.getResult(); + } + + if (!upload_result) { + logger_->log_error("Failed to upload blob '%s' to Azure storage container '%s'", blob_name, container_name); + session->transfer(flow_file, Failure); + } else { Review comment: Minor, but adding the else scope could be omitted if the unhappy path returned both here and in `createAzureStorageClient`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org