This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 320aed024f NIFI-10152 Storage client caching in Azure ADLS processors
320aed024f is described below

commit 320aed024f62ec87ccdd86e6f71e10d75eebd344
Author: Nandor Soma Abonyi <abonyis...@gmail.com>
AuthorDate: Thu Jun 23 01:25:45 2022 +0200

    NIFI-10152 Storage client caching in Azure ADLS processors
    
    This closes #6158.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../src/main/resources/META-INF/NOTICE             |   5 +
 .../nifi-azure-processors/pom.xml                  |   7 +-
 .../AbstractAzureDataLakeStorageProcessor.java     |  80 ++++---------
 .../azure/storage/ListAzureDataLakeStorage.java    |  14 ++-
 .../azure/storage/utils/AzureStorageUtils.java     |   4 +-
 .../utils/DataLakeServiceClientFactory.java        | 125 +++++++++++++++++++++
 .../azure/storage/ITListAzureDataLakeStorage.java  |  32 +++---
 .../utils/DataLakeServiceClientFactoryTest.java    |  84 ++++++++++++++
 .../azure/storage/ADLSCredentialsDetails.java      |  41 +++++++
 9 files changed, 310 insertions(+), 82 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
index 5faf77b19f..a4fb770588 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
@@ -195,6 +195,11 @@ The following binary components are provided under the 
Apache Software License v
         Reactive Streams Netty Driver
         Copyright 2020, Project Reactor
 
+  (ASLv2) Caffeine (com.github.ben-manes.caffeine:caffeine:jar:2.9.2 - 
https://github.com/ben-manes/caffeine)
+      The following NOTICE information applies:
+        Caffeine (caching library)
+        Copyright Ben Manes
+
 ************************
 Common Development and Distribution License 1.0
 ************************
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 667a7923ed..a6ad604524 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -135,6 +135,12 @@
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.9.2</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
@@ -159,7 +165,6 @@
             <version>1.18.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
-
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-schema-registry-service-api</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 1a66dda500..41e677d0a8 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -16,18 +16,10 @@
  */
 package org.apache.nifi.processors.azure;
 
-import com.azure.core.credential.AccessToken;
-import com.azure.core.credential.TokenCredential;
-import com.azure.core.http.HttpClient;
-import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
-import com.azure.identity.ClientSecretCredential;
-import com.azure.identity.ClientSecretCredentialBuilder;
-import com.azure.identity.ManagedIdentityCredential;
-import com.azure.identity.ManagedIdentityCredentialBuilder;
-import com.azure.storage.common.StorageSharedKeyCredential;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -40,9 +32,10 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import 
org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
 import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
 import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
-import reactor.core.publisher.Mono;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -51,7 +44,6 @@ import java.util.Map;
 import java.util.Set;
 
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
-import static 
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
 
 public abstract class AbstractAzureDataLakeStorageProcessor extends 
AbstractProcessor {
 
@@ -65,7 +57,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor 
extends AbstractProc
 
     public static final PropertyDescriptor FILESYSTEM = new 
PropertyDescriptor.Builder()
             .name("filesystem-name").displayName("Filesystem Name")
-            .description("Name of the Azure Storage File System. It is assumed 
to be already existing.")
+            .description("Name of the Azure Storage File System (also called 
Container). It is assumed to be already existing.")
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(true)
@@ -103,65 +95,31 @@ public abstract class 
AbstractAzureDataLakeStorageProcessor extends AbstractProc
 
     public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory";
 
+    private DataLakeServiceClientFactory clientFactory;
+
     @Override
     public Set<Relationship> getRelationships() {
         return RELATIONSHIPS;
     }
 
-    public static DataLakeServiceClient getStorageClient(PropertyContext 
context, FlowFile flowFile) {
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientFactory = new DataLakeServiceClientFactory(getLogger(), 
AzureStorageUtils.getProxyOptions(context));
+    }
+
+    @OnStopped
+    public void onStopped() {
+        clientFactory = null;
+    }
+
+    public DataLakeServiceClient getStorageClient(PropertyContext context, 
FlowFile flowFile) {
         final Map<String, String> attributes = flowFile != null ? 
flowFile.getAttributes() : Collections.emptyMap();
 
         final ADLSCredentialsService credentialsService = 
context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class);
 
         final ADLSCredentialsDetails credentialsDetails = 
credentialsService.getCredentialsDetails(attributes);
 
-        final String accountName = credentialsDetails.getAccountName();
-        final String accountKey = credentialsDetails.getAccountKey();
-        final String sasToken = credentialsDetails.getSasToken();
-        final AccessToken accessToken = credentialsDetails.getAccessToken();
-        final String endpointSuffix = credentialsDetails.getEndpointSuffix();
-        final boolean useManagedIdentity = 
credentialsDetails.getUseManagedIdentity();
-        final String managedIdentityClientId = 
credentialsDetails.getManagedIdentityClientId();
-        final String servicePrincipalTenantId = 
credentialsDetails.getServicePrincipalTenantId();
-        final String servicePrincipalClientId = 
credentialsDetails.getServicePrincipalClientId();
-        final String servicePrincipalClientSecret = 
credentialsDetails.getServicePrincipalClientSecret();
-
-        final String endpoint = String.format("https://%s.%s";, accountName, 
endpointSuffix);
-
-        final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new 
DataLakeServiceClientBuilder();
-        dataLakeServiceClientBuilder.endpoint(endpoint);
-
-        if (StringUtils.isNotBlank(accountKey)) {
-            final StorageSharedKeyCredential credential = new 
StorageSharedKeyCredential(accountName, accountKey);
-            dataLakeServiceClientBuilder.credential(credential);
-        } else if (StringUtils.isNotBlank(sasToken)) {
-            dataLakeServiceClientBuilder.sasToken(sasToken);
-        } else if (accessToken != null) {
-            final TokenCredential credential = tokenRequestContext -> 
Mono.just(accessToken);
-            dataLakeServiceClientBuilder.credential(credential);
-        } else if (useManagedIdentity) {
-            final ManagedIdentityCredential misCredential = new 
ManagedIdentityCredentialBuilder()
-                    .clientId(managedIdentityClientId)
-                    .build();
-            dataLakeServiceClientBuilder.credential(misCredential);
-        } else if (StringUtils.isNoneBlank(servicePrincipalTenantId, 
servicePrincipalClientId, servicePrincipalClientSecret)) {
-            final ClientSecretCredential credential = new 
ClientSecretCredentialBuilder()
-                    .tenantId(servicePrincipalTenantId)
-                    .clientId(servicePrincipalClientId)
-                    .clientSecret(servicePrincipalClientSecret)
-                    .build();
-            dataLakeServiceClientBuilder.credential(credential);
-        } else {
-            throw new IllegalArgumentException("No valid credentials were 
provided");
-        }
-
-        final NettyAsyncHttpClientBuilder nettyClientBuilder = new 
NettyAsyncHttpClientBuilder();
-        nettyClientBuilder.proxy(getProxyOptions(context));
-
-        final HttpClient nettyClient = nettyClientBuilder.build();
-        dataLakeServiceClientBuilder.httpClient(nettyClient);
-
-        final DataLakeServiceClient storageClient = 
dataLakeServiceClientBuilder.buildClient();
+        final DataLakeServiceClient storageClient = 
clientFactory.getStorageClient(credentialsDetails);
 
         return storageClient;
     }
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index 24f51b5ae2..601a7e285c 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -43,7 +43,10 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import 
org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
+import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -66,7 +69,6 @@ import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProce
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
 import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
-import static 
org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient;
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG;
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
@@ -170,6 +172,8 @@ public class ListAzureDataLakeStorage extends 
AbstractListAzureProcessor<ADLSFil
     private volatile Pattern filePattern;
     private volatile Pattern pathPattern;
 
+    private DataLakeServiceClientFactory clientFactory;
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTIES;
@@ -179,12 +183,14 @@ public class ListAzureDataLakeStorage extends 
AbstractListAzureProcessor<ADLSFil
     public void onScheduled(final ProcessContext context) {
         filePattern = getPattern(context, FILE_FILTER);
         pathPattern = getPattern(context, PATH_FILTER);
+        clientFactory = new DataLakeServiceClientFactory(getLogger(), 
AzureStorageUtils.getProxyOptions(context));
     }
 
     @OnStopped
     public void onStopped() {
         filePattern = null;
         pathPattern = null;
+        clientFactory = null;
     }
 
     @Override
@@ -264,7 +270,11 @@ public class ListAzureDataLakeStorage extends 
AbstractListAzureProcessor<ADLSFil
             final Pattern filePattern = listingMode == ListingMode.EXECUTION ? 
this.filePattern : getPattern(context, FILE_FILTER);
             final Pattern pathPattern = listingMode == ListingMode.EXECUTION ? 
this.pathPattern : getPattern(context, PATH_FILTER);
 
-            final DataLakeServiceClient storageClient = 
getStorageClient(context, null);
+            final ADLSCredentialsService credentialsService = 
context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class);
+
+            final ADLSCredentialsDetails credentialsDetails = 
credentialsService.getCredentialsDetails(Collections.emptyMap());
+
+            final DataLakeServiceClient storageClient = 
clientFactory.getStorageClient(credentialsDetails);
             final DataLakeFileSystemClient fileSystemClient = 
storageClient.getFileSystemClient(fileSystem);
 
             final ListPathsOptions options = new ListPathsOptions();
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index 654db1469f..5f17fbd43e 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -321,7 +321,7 @@ public final class AzureStorageUtils {
      *
      * Creates the {@link ProxyOptions proxy options} that {@link HttpClient} 
will use.
      *
-     * @param propertyContext is sed to supply Proxy configurations
+     * @param propertyContext to supply Proxy configurations
      * @return {@link ProxyOptions proxy options}, null if Proxy is not set
      */
     public static ProxyOptions getProxyOptions(final PropertyContext 
propertyContext) {
@@ -342,7 +342,7 @@ public final class AzureStorageUtils {
             return proxyOptions;
         }
 
-         return null;
+        return null;
     }
 
     private static ProxyOptions.Type getProxyType(ProxyConfiguration 
proxyConfiguration) {
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
new file mode 100644
index 0000000000..672618beb8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.azure.core.credential.AccessToken;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+import com.azure.identity.ClientSecretCredential;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.identity.ManagedIdentityCredential;
+import com.azure.identity.ManagedIdentityCredentialBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
+import reactor.core.publisher.Mono;
+
+public class DataLakeServiceClientFactory {
+
+    private static final long STORAGE_CLIENT_CACHE_SIZE = 10;
+
+    private final ComponentLog logger;
+    private final ProxyOptions proxyOptions;
+
+    private final Cache<ADLSCredentialsDetails, DataLakeServiceClient> 
clientCache;
+
+    public DataLakeServiceClientFactory(ComponentLog logger, ProxyOptions 
proxyOptions) {
+        this.logger = logger;
+        this.proxyOptions = proxyOptions;
+        this.clientCache = createCache();
+    }
+
+    private Cache<ADLSCredentialsDetails, DataLakeServiceClient> createCache() 
{
+        // Beware! By default, Caffeine does not perform cleanup and evict 
values
+        // "automatically" or instantly after a value expires. Because of that 
it
+        // can happen that there are more elements in the cache than the 
maximum size.
+        // See: https://github.com/ben-manes/caffeine/wiki/Cleanup
+        return Caffeine.newBuilder()
+                .maximumSize(STORAGE_CLIENT_CACHE_SIZE)
+                .build();
+    }
+
+    /**
+     * Retrieves a {@link DataLakeServiceClient}
+     *
+     * @param credentialsDetails used for caching because it can contain 
properties that are results of an expression
+     * @return DataLakeServiceClient
+     */
+    public DataLakeServiceClient getStorageClient(ADLSCredentialsDetails 
credentialsDetails) {
+        return clientCache.get(credentialsDetails, __ -> {
+            logger.debug("DataLakeServiceClient is not found in the cache with 
the given credentials. Creating it.");
+            return createStorageClient(credentialsDetails, proxyOptions);
+        });
+    }
+
+    private static DataLakeServiceClient 
createStorageClient(ADLSCredentialsDetails credentialsDetails, ProxyOptions 
proxyOptions) {
+        final String accountName = credentialsDetails.getAccountName();
+        final String accountKey = credentialsDetails.getAccountKey();
+        final String sasToken = credentialsDetails.getSasToken();
+        final AccessToken accessToken = credentialsDetails.getAccessToken();
+        final String endpointSuffix = credentialsDetails.getEndpointSuffix();
+        final boolean useManagedIdentity = 
credentialsDetails.getUseManagedIdentity();
+        final String managedIdentityClientId = 
credentialsDetails.getManagedIdentityClientId();
+        final String servicePrincipalTenantId = 
credentialsDetails.getServicePrincipalTenantId();
+        final String servicePrincipalClientId = 
credentialsDetails.getServicePrincipalClientId();
+        final String servicePrincipalClientSecret = 
credentialsDetails.getServicePrincipalClientSecret();
+
+        final String endpoint = String.format("https://%s.%s";, accountName, 
endpointSuffix);
+
+        final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new 
DataLakeServiceClientBuilder();
+        dataLakeServiceClientBuilder.endpoint(endpoint);
+
+        if (StringUtils.isNotBlank(accountKey)) {
+            final StorageSharedKeyCredential credential = new 
StorageSharedKeyCredential(accountName, accountKey);
+            dataLakeServiceClientBuilder.credential(credential);
+        } else if (StringUtils.isNotBlank(sasToken)) {
+            dataLakeServiceClientBuilder.sasToken(sasToken);
+        } else if (accessToken != null) {
+            final TokenCredential credential = tokenRequestContext -> 
Mono.just(accessToken);
+            dataLakeServiceClientBuilder.credential(credential);
+        } else if (useManagedIdentity) {
+            final ManagedIdentityCredential misCredential = new 
ManagedIdentityCredentialBuilder()
+                    .clientId(managedIdentityClientId)
+                    .build();
+            dataLakeServiceClientBuilder.credential(misCredential);
+        } else if (StringUtils.isNoneBlank(servicePrincipalTenantId, 
servicePrincipalClientId, servicePrincipalClientSecret)) {
+            final ClientSecretCredential credential = new 
ClientSecretCredentialBuilder()
+                    .tenantId(servicePrincipalTenantId)
+                    .clientId(servicePrincipalClientId)
+                    .clientSecret(servicePrincipalClientSecret)
+                    .build();
+            dataLakeServiceClientBuilder.credential(credential);
+        } else {
+            throw new IllegalArgumentException("No valid credentials were 
provided");
+        }
+
+        final NettyAsyncHttpClientBuilder nettyClientBuilder = new 
NettyAsyncHttpClientBuilder();
+        nettyClientBuilder.proxy(proxyOptions);
+
+        final HttpClient nettyClient = nettyClientBuilder.build();
+        dataLakeServiceClientBuilder.httpClient(nettyClient);
+
+        return dataLakeServiceClientBuilder.buildClient();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
index 978fe9433f..26ee97ee65 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
@@ -98,7 +98,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListRootRecursive() throws Exception {
+    public void testListRootRecursive() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
 
         runProcessor();
@@ -131,7 +131,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListRootNonRecursive() throws Exception {
+    public void testListRootNonRecursive() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
         runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, 
"false");
 
@@ -152,7 +152,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListSubdirectoryRecursive() throws Exception {
+    public void testListSubdirectoryRecursive() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir1");
 
         runProcessor();
@@ -173,7 +173,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListSubdirectoryNonRecursive() throws Exception {
+    public void testListSubdirectoryNonRecursive() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir1");
         runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, 
"false");
 
@@ -194,7 +194,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListWithFileFilter() throws Exception {
+    public void testListWithFileFilter() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
         runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
 
@@ -218,7 +218,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListWithFileFilterWithEL() throws Exception {
+    public void testListWithFileFilterWithEL() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
         runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, 
".*file${suffix}$");
         runner.setVariable("suffix", "1.*");
@@ -244,7 +244,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListRootWithPathFilter() throws Exception {
+    public void testListRootWithPathFilter() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
         runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
 
@@ -267,7 +267,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListRootWithPathFilterWithEL() throws Exception {
+    public void testListRootWithPathFilterWithEL() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
         runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, 
"${prefix}${suffix}");
         runner.setVariable("prefix", "^dir");
@@ -294,7 +294,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListSubdirectoryWithPathFilter() throws Exception {
+    public void testListSubdirectoryWithPathFilter() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir1");
         runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
 
@@ -315,7 +315,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListRootWithFileAndPathFilter() throws Exception {
+    public void testListRootWithFileAndPathFilter() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
         runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
         runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
@@ -339,7 +339,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListEmptyDirectory() throws Exception {
+    public void testListEmptyDirectory() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"dir3");
 
         runProcessor();
@@ -401,7 +401,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListWithMinAge() throws Exception {
+    public void testListWithMinAge() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
         runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
 
@@ -422,7 +422,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListWithMaxAge() throws Exception {
+    public void testListWithMaxAge() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
         runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
 
@@ -447,7 +447,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListWithMinSize() throws Exception {
+    public void testListWithMinSize() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
         runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
 
@@ -471,7 +471,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testListWithMaxSize() throws Exception {
+    public void testListWithMaxSize() {
         runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, 
"");
         runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
 
@@ -496,7 +496,7 @@ public class ITListAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         runner.run();
     }
 
-    private void assertSuccess(String... testFilePaths) throws Exception {
+    private void assertSuccess(String... testFilePaths) {
         runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 
testFilePaths.length);
 
         Map<String, TestFile> expectedFiles = new HashMap<>(testFiles);
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactoryTest.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactoryTest.java
new file mode 100644
index 0000000000..8576553d28
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactoryTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+@ExtendWith(MockitoExtension.class)
+class DataLakeServiceClientFactoryTest {
+
+    @Mock
+    private ComponentLog logger;
+
+    @Test
+    void testThatServiceClientIsCachedByCredentials() {
+        final DataLakeServiceClientFactory clientFactory = new 
DataLakeServiceClientFactory(logger, null);
+
+        final ADLSCredentialsDetails credentials = 
createCredentialDetails("account");
+
+        final DataLakeServiceClient clientOne = 
clientFactory.getStorageClient(credentials);
+        final DataLakeServiceClient clientTwo = 
clientFactory.getStorageClient(credentials);
+
+        assertSame(clientOne, clientTwo);
+    }
+
+    @Test
+    void testThatDifferentServiceClientIsReturnedForDifferentCredentials() {
+        final DataLakeServiceClientFactory clientFactory = new 
DataLakeServiceClientFactory(logger, null);
+
+        final ADLSCredentialsDetails credentialsOne = 
createCredentialDetails("accountOne");
+        final ADLSCredentialsDetails credentialsTwo = 
createCredentialDetails("accountTwo");
+
+        final DataLakeServiceClient clientOne = 
clientFactory.getStorageClient(credentialsOne);
+        final DataLakeServiceClient clientTwo = 
clientFactory.getStorageClient(credentialsTwo);
+
+        assertNotSame(clientOne, clientTwo);
+    }
+
+    @Test
+    void testThatCachedClientIsReturnedAfterDifferentClientIsCreated() {
+        final DataLakeServiceClientFactory clientFactory = new 
DataLakeServiceClientFactory(logger, null);
+
+        final ADLSCredentialsDetails credentialsOne = 
createCredentialDetails("accountOne");
+        final ADLSCredentialsDetails credentialsTwo = 
createCredentialDetails("accountTwo");
+        final ADLSCredentialsDetails credentialsThree = 
createCredentialDetails("accountOne");
+
+        final DataLakeServiceClient clientOne = 
clientFactory.getStorageClient(credentialsOne);
+        final DataLakeServiceClient clientTwo = 
clientFactory.getStorageClient(credentialsTwo);
+        final DataLakeServiceClient clientThree = 
clientFactory.getStorageClient(credentialsThree);
+
+        assertNotSame(clientOne, clientTwo);
+        assertSame(clientOne, clientThree);
+    }
+
+    private ADLSCredentialsDetails createCredentialDetails(String accountName) 
{
+        return ADLSCredentialsDetails.Builder.newBuilder()
+                .setAccountName(accountName)
+                .setAccountKey("accountKey")
+                .setEndpointSuffix("dfs.core.windows.net")
+                .build();
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
index eb3b1237c0..0a831161e5 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java
@@ -18,6 +18,8 @@ package org.apache.nifi.services.azure.storage;
 
 import com.azure.core.credential.AccessToken;
 
+import java.util.Objects;
+
 public class ADLSCredentialsDetails {
     private final String accountName;
 
@@ -98,6 +100,45 @@ public class ADLSCredentialsDetails {
         return servicePrincipalClientSecret;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ADLSCredentialsDetails that = (ADLSCredentialsDetails) o;
+        return useManagedIdentity == that.useManagedIdentity
+                && Objects.equals(accountName, that.accountName)
+                && Objects.equals(accountKey, that.accountKey)
+                && Objects.equals(sasToken, that.sasToken)
+                && Objects.equals(endpointSuffix, that.endpointSuffix)
+                && Objects.equals(accessToken, that.accessToken)
+                && Objects.equals(managedIdentityClientId, 
that.managedIdentityClientId)
+                && Objects.equals(servicePrincipalTenantId, 
that.servicePrincipalTenantId)
+                && Objects.equals(servicePrincipalClientId, 
that.servicePrincipalClientId)
+                && Objects.equals(servicePrincipalClientSecret, 
that.servicePrincipalClientSecret);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                accountName,
+                accountKey,
+                sasToken,
+                endpointSuffix,
+                accessToken,
+                useManagedIdentity,
+                managedIdentityClientId,
+                servicePrincipalTenantId,
+                servicePrincipalClientId,
+                servicePrincipalClientSecret
+        );
+    }
+
     public static class Builder {
         private String accountName;
         private String accountKey;


Reply via email to