exceptionfactory commented on code in PR #11100:
URL: https://github.com/apache/nifi/pull/11100#discussion_r3034330701


##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java:
##########
@@ -33,16 +36,29 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class ParameterProviderSecretsManager implements SecretsManager {
     private static final Logger logger = 
LoggerFactory.getLogger(ParameterProviderSecretsManager.class);
+    private static final String DEFAULT_CACHE_DURATION = "5 mins";
+
     private FlowManager flowManager;
+    private Duration cacheDuration;
+    private final Map<String, CachedSecret> secretCache = new 
ConcurrentHashMap<>();
+
+    private record CachedSecret(Secret secret, long timestampNanos) {
+    }
 
     @Override
     public void initialize(final SecretsManagerInitializationContext 
initializationContext) {
         this.flowManager = initializationContext.getFlowManager();
+
+        final String cacheDurationValue = 
initializationContext.getProperty(NiFiProperties.SECRETS_MANAGER_CACHE_DURATION);
+        final String effectiveDuration = cacheDurationValue != null ? 
cacheDurationValue : DEFAULT_CACHE_DURATION;

Review Comment:
   ```suggestion
           final String effectiveDuration = cacheDurationValue == null ? 
DEFAULT_CACHE_DURATION ? cacheDurationValue;
   ```



##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java:
##########
@@ -136,9 +177,77 @@ public Map<SecretReference, Secret> getSecrets(final 
Set<SecretReference> secret
         return secrets;
     }
 
-    private SecretProvider findProvider(final SecretReference secretReference) 
{
+    private Map<SecretReference, Secret> fetchSecretsWithCache(final 
Set<SecretReference> secretReferences) {
         final Set<SecretProvider> providers = getSecretProviders();
+        final Map<SecretReference, Secret> results = new HashMap<>();
+
+        // Partition references into cache hits vs. misses that need fetching
+        final Map<SecretProvider, Set<SecretReference>> uncachedByProvider = 
new HashMap<>();
+        for (final SecretReference secretReference : secretReferences) {
+            final String fqn = secretReference.getFullyQualifiedName();
+
+            if (fqn != null) {
+                final CachedSecret cached = secretCache.get(fqn);
+                if (cached != null && !isExpired(cached)) {
+                    logger.debug("Cache hit for secret [{}]", fqn);
+                    results.put(secretReference, cached.secret());
+                    continue;
+                }
+            }
+
+            final SecretProvider provider = findProvider(secretReference, 
providers);
+            uncachedByProvider.computeIfAbsent(provider, k -> new 
HashSet<>()).add(secretReference);
+        }
+
+        // Batch fetch uncached secrets grouped by provider
+        for (final Map.Entry<SecretProvider, Set<SecretReference>> entry : 
uncachedByProvider.entrySet()) {
+            final SecretProvider provider = entry.getKey();
+            final Set<SecretReference> references = entry.getValue();
+
+            if (provider == null) {
+                for (final SecretReference secretReference : references) {
+                    results.put(secretReference, null);
+                }
+                continue;
+            }
+
+            final List<String> secretNames = new ArrayList<>();
+            references.forEach(ref -> 
secretNames.add(ref.getFullyQualifiedName()));
+            final List<Secret> retrievedSecrets = 
provider.getSecrets(secretNames);
+            final Map<String, Secret> secretsByName = retrievedSecrets.stream()
+                .collect(Collectors.toMap(Secret::getFullyQualifiedName, 
Function.identity()));
+
+            for (final SecretReference secretReference : references) {
+                final String fqn = secretReference.getFullyQualifiedName();
+                final Secret secret = secretsByName.get(fqn);
+                results.put(secretReference, secret);
+
+                if (secret != null && fqn != null) {
+                    cacheSecret(fqn, secret);
+                }
+            }
+        }
+
+        return results;
+    }
+
+    @Override
+    public void invalidateCache() {
+        secretCache.clear();
+        logger.debug("Secret cache invalidated");
+    }
+
+    private boolean isExpired(final CachedSecret cached) {
+        return Duration.ofNanos(System.nanoTime() - 
cached.timestampNanos()).compareTo(cacheDuration) >= 0;

Review Comment:
   Recommend breaking this out to multiple lines to declare intermediate 
variables for easier reading. It should also be possible to compare on Duration 
to another.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManager.java:
##########
@@ -37,4 +37,10 @@ public interface SecretsManager {
 
     Map<SecretReference, Secret> getSecrets(Set<SecretReference> 
secretReferences);
 
+    /**
+     * Invalidates any cached secret data, forcing the next access to fetch 
fresh data
+     * from the underlying secret providers.
+     */
+    void invalidateCache();

Review Comment:
   Is it necessary to add a `default` implementation for this method?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/components/connector/secrets/TestParameterProviderSecretsManager.java:
##########
@@ -370,5 +379,199 @@ public void 
testGetSecretProvidersFiltersOutInvalidProviders() {
         assertEquals(1, secretProviders.size());
         assertEquals("valid-id", 
secretProviders.iterator().next().getProviderId());
     }
+
+    @Test
+    public void testZeroDurationDisablesCaching() {
+        final ParameterProviderNode providerNode = 
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+            createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION, 
SECRET_1_VALUE));
+        final ParameterProviderSecretsManager manager = 
createManagerWithCacheDuration("0 sec", providerNode);
+
+        final SecretReference reference = createSecretReference(PROVIDER_1_ID, 
null, SECRET_1_NAME);
+
+        manager.getSecret(reference);
+        manager.getSecret(reference);
+
+        verify(providerNode, times(2)).fetchParameterValues(anyList());
+    }
+
+    @Test
+    public void testSecretCacheReturnsFromCacheWithinDuration() {
+        final ParameterProviderNode providerNode = 
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+            createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION, 
SECRET_1_VALUE));
+        final ParameterProviderSecretsManager manager = 
createManagerWithCacheDuration("5 mins", providerNode);
+
+        final SecretReference reference = createSecretReference(PROVIDER_1_ID, 
null, SECRET_1_NAME);
+
+        final Optional<Secret> first = manager.getSecret(reference);
+        final Optional<Secret> second = manager.getSecret(reference);
+
+        assertTrue(first.isPresent());
+        assertTrue(second.isPresent());
+        assertEquals(SECRET_1_VALUE, first.get().getValue());
+        assertEquals(SECRET_1_VALUE, second.get().getValue());
+
+        verify(providerNode, times(1)).fetchParameterValues(anyList());
+    }
+
+    @Test
+    public void testSecretCacheRefreshesAfterDurationExpiry() throws 
InterruptedException {
+        final ParameterProviderNode providerNode = 
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+            createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION, 
SECRET_1_VALUE));
+        final ParameterProviderSecretsManager manager = 
createManagerWithCacheDuration("100 ms", providerNode);
+
+        final SecretReference reference = createSecretReference(PROVIDER_1_ID, 
null, SECRET_1_NAME);
+
+        manager.getSecret(reference);
+        verify(providerNode, times(1)).fetchParameterValues(anyList());
+
+        Thread.sleep(200);
+
+        manager.getSecret(reference);
+        verify(providerNode, times(2)).fetchParameterValues(anyList());
+    }
+
+    @Test
+    public void testInvalidateCacheForcesSecretRefresh() {
+        final ParameterProviderNode providerNode = 
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+            createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION, 
SECRET_1_VALUE));
+        final ParameterProviderSecretsManager manager = 
createManagerWithCacheDuration("5 mins", providerNode);

Review Comment:
   Recommend declaring a static variable and reusing for this default `5 mins` 
value.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManagerInitializationContext.java:
##########
@@ -23,4 +23,12 @@ public interface SecretsManagerInitializationContext {
 
     FlowManager getFlowManager();
 
+    /**
+     * Returns the value of the given configuration property, or {@code null} 
if the property is not set.
+     *
+     * @param key the property key
+     * @return the property value, or {@code null}
+     */
+    String getProperty(String key);

Review Comment:
   Recommend naming this `getApplicationProperty()` for clarity, since it 
sources from NiFi application properties.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to