This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b691b133b NIFI-10192 Caffeine cache schema request for reuse
4b691b133b is described below

commit 4b691b133b231cdec04290afaf1fe433b6047717
Author: Aerilym <aerilym...@gmail.com>
AuthorDate: Fri Sep 9 12:47:49 2022 +1000

    NIFI-10192 Caffeine cache schema request for reuse
    
    This closes #6364
    
    Signed-off-by: Mike Thomsen <mthom...@apache.org>
---
 .../nifi/processors/standard/LookupRecord.java     | 43 +++++++++++++++++++++-
 1 file changed, 41 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 7301c26f63..7e4285ad2f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.processors.standard;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -45,6 +47,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPath;
 import org.apache.nifi.record.path.RecordPathResult;
@@ -194,6 +197,18 @@ public class LookupRecord extends AbstractProcessor {
         .required(true)
         .build();
 
+    static final PropertyDescriptor CACHE_SIZE = new 
PropertyDescriptor.Builder()
+        .name("record-path-lookup-miss-result-cache-size")
+        .displayName("Cache Size")
+        .description("Specifies how many lookup values/records should be 
cached."
+                + "Setting this property to zero means no caching will be done 
and the table will be queried for each lookup value in each record. If the 
lookup "
+                + "table changes often or the most recent data must be 
retrieved, do not use the cache.")
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .defaultValue("0")
+        .required(true)
+        .build();
+
     static final Relationship REL_MATCHED = new Relationship.Builder()
         .name("matched")
         .description("All records for which the lookup returns a value will be 
routed to this relationship")
@@ -238,6 +253,7 @@ public class LookupRecord extends AbstractProcessor {
         properties.add(ROUTING_STRATEGY);
         properties.add(RESULT_CONTENTS);
         properties.add(REPLACEMENT_STRATEGY);
+        properties.add(CACHE_SIZE);
         return properties;
     }
 
@@ -451,7 +467,7 @@ public class LookupRecord extends AbstractProcessor {
         if (isInPlaceReplacement) {
             return new InPlaceReplacementStrategy();
         } else {
-            return new RecordPathReplacementStrategy();
+            return new RecordPathReplacementStrategy(context);
         }
     }
 
@@ -536,6 +552,19 @@ public class LookupRecord extends AbstractProcessor {
     private class RecordPathReplacementStrategy implements ReplacementStrategy 
{
         private int lookupCount = 0;
 
+        private volatile Cache<Map<String, Object>, Optional<?>> cache;
+
+        public RecordPathReplacementStrategy(ProcessContext context) {
+
+            final int cacheSize = 
context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+
+            if (this.cache == null || cacheSize > 0) {
+                this.cache = Caffeine.newBuilder()
+                        .maximumSize(cacheSize)
+                        .build();
+            }
+        }
+
         @Override
         public Set<Relationship> lookup(final Record record, final 
ProcessContext context, final LookupContext lookupContext) {
             lookupCount++;
@@ -548,8 +577,15 @@ public class LookupRecord extends AbstractProcessor {
 
             final FlowFile flowFile = lookupContext.getOriginalFlowFile();
             final Optional<?> lookupValueOption;
+            final Optional<?> lookupValueCacheOption;
+
             try {
-                lookupValueOption = lookupService.lookup(lookupCoordinates, 
flowFile.getAttributes());
+                lookupValueCacheOption = (Optional<?>) 
cache.get(lookupCoordinates, k -> null);
+                if (lookupValueCacheOption == null) {
+                    lookupValueOption = 
lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
+                } else {
+                    lookupValueOption = lookupValueCacheOption;
+                }
             } catch (final Exception e) {
                 throw new ProcessException("Failed to lookup coordinates " + 
lookupCoordinates + " in Lookup Service", e);
             }
@@ -634,6 +670,9 @@ public class LookupRecord extends AbstractProcessor {
                     }
 
                     final Optional<?> lookupResult = 
lookupService.lookup(lookupCoordinates, flowFileAttributes);
+
+                    cache.put(lookupCoordinates, lookupResult);
+
                     if (!lookupResult.isPresent()) {
                         continue;
                     }

Reply via email to