This is an automated email from the ASF dual-hosted git repository. mthomsen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 4b691b133b NIFI-10192 Caffeine cache schema request for reuse 4b691b133b is described below commit 4b691b133b231cdec04290afaf1fe433b6047717 Author: Aerilym <aerilym...@gmail.com> AuthorDate: Fri Sep 9 12:47:49 2022 +1000 NIFI-10192 Caffeine cache schema request for reuse This closes #6364 Signed-off-by: Mike Thomsen <mthom...@apache.org> --- .../nifi/processors/standard/LookupRecord.java | 43 +++++++++++++++++++++- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 7301c26f63..7e4285ad2f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -17,6 +17,8 @@ package org.apache.nifi.processors.standard; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -45,6 +47,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPath; import org.apache.nifi.record.path.RecordPathResult; @@ -194,6 +197,18 @@ public class LookupRecord extends AbstractProcessor { .required(true) .build(); + static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() + .name("record-path-lookup-miss-result-cache-size") + .displayName("Cache Size") + .description("Specifies how many lookup values/records should be cached." + + "Setting this property to zero means no caching will be done and the table will be queried for each lookup value in each record. If the lookup " + + "table changes often or the most recent data must be retrieved, do not use the cache.") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("0") + .required(true) + .build(); + static final Relationship REL_MATCHED = new Relationship.Builder() .name("matched") .description("All records for which the lookup returns a value will be routed to this relationship") @@ -238,6 +253,7 @@ public class LookupRecord extends AbstractProcessor { properties.add(ROUTING_STRATEGY); properties.add(RESULT_CONTENTS); properties.add(REPLACEMENT_STRATEGY); + properties.add(CACHE_SIZE); return properties; } @@ -451,7 +467,7 @@ public class LookupRecord extends AbstractProcessor { if (isInPlaceReplacement) { return new InPlaceReplacementStrategy(); } else { - return new RecordPathReplacementStrategy(); + return new RecordPathReplacementStrategy(context); } } @@ -536,6 +552,19 @@ public class LookupRecord extends AbstractProcessor { private class RecordPathReplacementStrategy implements ReplacementStrategy { private int lookupCount = 0; + private volatile Cache<Map<String, Object>, Optional<?>> cache; + + public RecordPathReplacementStrategy(ProcessContext context) { + + final int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger(); + + if (this.cache == null || cacheSize > 0) { + this.cache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .build(); + } + } + @Override public Set<Relationship> lookup(final Record record, final ProcessContext context, final LookupContext lookupContext) { lookupCount++; @@ -548,8 +577,15 @@ public class LookupRecord extends AbstractProcessor { final FlowFile flowFile = lookupContext.getOriginalFlowFile(); final Optional<?> lookupValueOption; + final Optional<?> lookupValueCacheOption; + try { - lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes()); + lookupValueCacheOption = (Optional<?>) cache.get(lookupCoordinates, k -> null); + if (lookupValueCacheOption == null) { + lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes()); + } else { + lookupValueOption = lookupValueCacheOption; + } } catch (final Exception e) { throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); } @@ -634,6 +670,9 @@ public class LookupRecord extends AbstractProcessor { } final Optional<?> lookupResult = lookupService.lookup(lookupCoordinates, flowFileAttributes); + + cache.put(lookupCoordinates, lookupResult); + if (!lookupResult.isPresent()) { continue; }