exceptionfactory commented on code in PR #11100:
URL: https://github.com/apache/nifi/pull/11100#discussion_r3034330701
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java:
##########
@@ -33,16 +36,29 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
public class ParameterProviderSecretsManager implements SecretsManager {
private static final Logger logger =
LoggerFactory.getLogger(ParameterProviderSecretsManager.class);
+ private static final String DEFAULT_CACHE_DURATION = "5 mins";
+
private FlowManager flowManager;
+ private Duration cacheDuration;
+ private final Map<String, CachedSecret> secretCache = new
ConcurrentHashMap<>();
+
+ private record CachedSecret(Secret secret, long timestampNanos) {
+ }
@Override
public void initialize(final SecretsManagerInitializationContext
initializationContext) {
this.flowManager = initializationContext.getFlowManager();
+
+ final String cacheDurationValue =
initializationContext.getProperty(NiFiProperties.SECRETS_MANAGER_CACHE_DURATION);
+ final String effectiveDuration = cacheDurationValue != null ?
cacheDurationValue : DEFAULT_CACHE_DURATION;
Review Comment:
```suggestion
final String effectiveDuration = cacheDurationValue == null ?
DEFAULT_CACHE_DURATION ? cacheDurationValue;
```
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java:
##########
@@ -136,9 +177,77 @@ public Map<SecretReference, Secret> getSecrets(final
Set<SecretReference> secret
return secrets;
}
- private SecretProvider findProvider(final SecretReference secretReference)
{
+ private Map<SecretReference, Secret> fetchSecretsWithCache(final
Set<SecretReference> secretReferences) {
final Set<SecretProvider> providers = getSecretProviders();
+ final Map<SecretReference, Secret> results = new HashMap<>();
+
+ // Partition references into cache hits vs. misses that need fetching
+ final Map<SecretProvider, Set<SecretReference>> uncachedByProvider =
new HashMap<>();
+ for (final SecretReference secretReference : secretReferences) {
+ final String fqn = secretReference.getFullyQualifiedName();
+
+ if (fqn != null) {
+ final CachedSecret cached = secretCache.get(fqn);
+ if (cached != null && !isExpired(cached)) {
+ logger.debug("Cache hit for secret [{}]", fqn);
+ results.put(secretReference, cached.secret());
+ continue;
+ }
+ }
+
+ final SecretProvider provider = findProvider(secretReference,
providers);
+ uncachedByProvider.computeIfAbsent(provider, k -> new
HashSet<>()).add(secretReference);
+ }
+
+ // Batch fetch uncached secrets grouped by provider
+ for (final Map.Entry<SecretProvider, Set<SecretReference>> entry :
uncachedByProvider.entrySet()) {
+ final SecretProvider provider = entry.getKey();
+ final Set<SecretReference> references = entry.getValue();
+
+ if (provider == null) {
+ for (final SecretReference secretReference : references) {
+ results.put(secretReference, null);
+ }
+ continue;
+ }
+
+ final List<String> secretNames = new ArrayList<>();
+ references.forEach(ref ->
secretNames.add(ref.getFullyQualifiedName()));
+ final List<Secret> retrievedSecrets =
provider.getSecrets(secretNames);
+ final Map<String, Secret> secretsByName = retrievedSecrets.stream()
+ .collect(Collectors.toMap(Secret::getFullyQualifiedName,
Function.identity()));
+
+ for (final SecretReference secretReference : references) {
+ final String fqn = secretReference.getFullyQualifiedName();
+ final Secret secret = secretsByName.get(fqn);
+ results.put(secretReference, secret);
+
+ if (secret != null && fqn != null) {
+ cacheSecret(fqn, secret);
+ }
+ }
+ }
+
+ return results;
+ }
+
+ @Override
+ public void invalidateCache() {
+ secretCache.clear();
+ logger.debug("Secret cache invalidated");
+ }
+
+ private boolean isExpired(final CachedSecret cached) {
+ return Duration.ofNanos(System.nanoTime() -
cached.timestampNanos()).compareTo(cacheDuration) >= 0;
Review Comment:
Recommend breaking this out to multiple lines to declare intermediate
variables for easier reading. It should also be possible to compare on Duration
to another.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManager.java:
##########
@@ -37,4 +37,10 @@ public interface SecretsManager {
Map<SecretReference, Secret> getSecrets(Set<SecretReference>
secretReferences);
+ /**
+ * Invalidates any cached secret data, forcing the next access to fetch
fresh data
+ * from the underlying secret providers.
+ */
+ void invalidateCache();
Review Comment:
Is it necessary to add a `default` implementation for this method?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/components/connector/secrets/TestParameterProviderSecretsManager.java:
##########
@@ -370,5 +379,199 @@ public void
testGetSecretProvidersFiltersOutInvalidProviders() {
assertEquals(1, secretProviders.size());
assertEquals("valid-id",
secretProviders.iterator().next().getProviderId());
}
+
+ @Test
+ public void testZeroDurationDisablesCaching() {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration("0 sec", providerNode);
+
+ final SecretReference reference = createSecretReference(PROVIDER_1_ID,
null, SECRET_1_NAME);
+
+ manager.getSecret(reference);
+ manager.getSecret(reference);
+
+ verify(providerNode, times(2)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testSecretCacheReturnsFromCacheWithinDuration() {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration("5 mins", providerNode);
+
+ final SecretReference reference = createSecretReference(PROVIDER_1_ID,
null, SECRET_1_NAME);
+
+ final Optional<Secret> first = manager.getSecret(reference);
+ final Optional<Secret> second = manager.getSecret(reference);
+
+ assertTrue(first.isPresent());
+ assertTrue(second.isPresent());
+ assertEquals(SECRET_1_VALUE, first.get().getValue());
+ assertEquals(SECRET_1_VALUE, second.get().getValue());
+
+ verify(providerNode, times(1)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testSecretCacheRefreshesAfterDurationExpiry() throws
InterruptedException {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration("100 ms", providerNode);
+
+ final SecretReference reference = createSecretReference(PROVIDER_1_ID,
null, SECRET_1_NAME);
+
+ manager.getSecret(reference);
+ verify(providerNode, times(1)).fetchParameterValues(anyList());
+
+ Thread.sleep(200);
+
+ manager.getSecret(reference);
+ verify(providerNode, times(2)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testInvalidateCacheForcesSecretRefresh() {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration("5 mins", providerNode);
Review Comment:
Recommend declaring a static variable and reusing for this default `5 mins`
value.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManagerInitializationContext.java:
##########
@@ -23,4 +23,12 @@ public interface SecretsManagerInitializationContext {
FlowManager getFlowManager();
+ /**
+ * Returns the value of the given configuration property, or {@code null}
if the property is not set.
+ *
+ * @param key the property key
+ * @return the property value, or {@code null}
+ */
+ String getProperty(String key);
Review Comment:
Recommend naming this `getApplicationProperty()` for clarity, since it
sources from NiFi application properties.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]