martinzink commented on a change in pull request #1178: URL: https://github.com/apache/nifi-minifi-cpp/pull/1178#discussion_r719326342
########## File path: extensions/azure/processors/PutAzureDataLakeStorage.cpp ########## @@ -72,19 +72,26 @@ void PutAzureDataLakeStorage::initialize() { } void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) { - connection_string_ = getConnectionStringFromControllerService(context); - if (connection_string_.empty()) { + auto credentials = getCredentialsFromControllerService(context); + if (!credentials) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service property missing or invalid"); } + if ((!credentials->getUseManagedIdentityCredentials() && credentials->buildConnectionString().empty()) || + (credentials->getUseManagedIdentityCredentials() && credentials->getStorageAccountName().empty())) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service properties are not set or invalid"); + } + + credentials_ = *credentials; Review comment: This onSchedule also sets members but isnt protected by the mutex ########## File path: extensions/azure/processors/PutAzureBlobStorage.cpp ########## @@ -179,53 +232,43 @@ void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext> return; } - auto connection_string = getConnectionString(context, flow_file); - if (connection_string.empty()) { - logger_->log_error("Connection string is empty!"); - session->transfer(flow_file, Failure); - return; - } - - std::string container_name; - if (!context->getProperty(ContainerName, container_name, flow_file) || container_name.empty()) { - logger_->log_error("Container Name is invalid or empty!"); - session->transfer(flow_file, Failure); - return; - } - - std::string blob_name; - if (!context->getProperty(Blob, blob_name, flow_file) || blob_name.empty()) { - logger_->log_error("Blob name is invalid or empty!"); + auto params = buildAzureBlobStorageParameters(context, flow_file); + if (!params) { session->transfer(flow_file, Failure); return; } std::optional<storage::UploadBlobResult> upload_result; { // TODO(lordgamez): This can be removed after maximum allowed threads are implemented. See https://issues.apache.org/jira/browse/MINIFICPP-1566 + // When used in multithreaded environment make sure to use the azure_storage_mutex_ to lock the wrapper so the + // client is not reset with different configuration while another thread is using it. std::lock_guard<std::mutex> lock(azure_storage_mutex_); - createAzureStorageClient(connection_string, container_name); if (create_container_) { Review comment: onSchedule sets this create_container_ member but onSchedule is not protected by the azure_storage_mutex_ Could that cause unexpected behaviour in multithreaded enviroment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org