martinzink commented on a change in pull request #1178:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1178#discussion_r719326342



##########
File path: extensions/azure/processors/PutAzureDataLakeStorage.cpp
##########
@@ -72,19 +72,26 @@ void PutAzureDataLakeStorage::initialize() {
 }
 
 void PutAzureDataLakeStorage::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
-  connection_string_ = getConnectionStringFromControllerService(context);
-  if (connection_string_.empty()) {
+  auto credentials = getCredentialsFromControllerService(context);
+  if (!credentials) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials 
Service property missing or invalid");
   }
 
+  if ((!credentials->getUseManagedIdentityCredentials() && 
credentials->buildConnectionString().empty()) ||
+      (credentials->getUseManagedIdentityCredentials() && 
credentials->getStorageAccountName().empty())) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials 
Service properties are not set or invalid");
+  }
+
+  credentials_ = *credentials;

Review comment:
       This onSchedule also sets members but isnt protected by the mutex

##########
File path: extensions/azure/processors/PutAzureBlobStorage.cpp
##########
@@ -179,53 +232,43 @@ void PutAzureBlobStorage::onTrigger(const 
std::shared_ptr<core::ProcessContext>
     return;
   }
 
-  auto connection_string = getConnectionString(context, flow_file);
-  if (connection_string.empty()) {
-    logger_->log_error("Connection string is empty!");
-    session->transfer(flow_file, Failure);
-    return;
-  }
-
-  std::string container_name;
-  if (!context->getProperty(ContainerName, container_name, flow_file) || 
container_name.empty()) {
-    logger_->log_error("Container Name is invalid or empty!");
-    session->transfer(flow_file, Failure);
-    return;
-  }
-
-  std::string blob_name;
-  if (!context->getProperty(Blob, blob_name, flow_file) || blob_name.empty()) {
-    logger_->log_error("Blob name is invalid or empty!");
+  auto params = buildAzureBlobStorageParameters(context, flow_file);
+  if (!params) {
     session->transfer(flow_file, Failure);
     return;
   }
 
   std::optional<storage::UploadBlobResult> upload_result;
   {
     // TODO(lordgamez): This can be removed after maximum allowed threads are 
implemented. See https://issues.apache.org/jira/browse/MINIFICPP-1566
+    // When used in multithreaded environment make sure to use the 
azure_storage_mutex_ to lock the wrapper so the
+    // client is not reset with different configuration while another thread 
is using it.
     std::lock_guard<std::mutex> lock(azure_storage_mutex_);
-    createAzureStorageClient(connection_string, container_name);
     if (create_container_) {

Review comment:
       onSchedule sets this create_container_ member but onSchedule is not 
protected by the azure_storage_mutex_
   Could that cause unexpected behaviour in multithreaded enviroment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to