This is an automated email from the ASF dual-hosted git repository.

otto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0fe3bb518b Feature/nifi integration address text (#755)
0fe3bb518b is described below

commit 0fe3bb518b9a987a4717df45600c51332e347e84
Author: Unai LerĂ­a Fortea <[email protected]>
AuthorDate: Tue Feb 7 19:32:55 2023 +0100

    Feature/nifi integration address text (#755)
    
    * Add Address Text property and selector
    
    * Update Readme
    
    * Added description to acceptable values in Address Access Strategy
    
    * Change AddressAccessStrategies to abstract
    
    * Separate AddressesAccessStrategies to individual files
    
    * Simplify properties initialization
    
    * Add jackson dependencies
    
    * Add first record schema cache iteration
    
    * Added an Avro RecordSchema cache with FIFO replacement policy
    
    * Add license to SchemaCache
    
    * Add PlcTags into SchemaCache
    
    * Remove invalid TODO's
    
    * Add SchemaCacheTest
    
    * Test addSchema
    * Test first in first out schema override in addSchema
    
    * Add SchemaCache documentation
    
    * Add logging of cache misses and additions into DEBUG mode/level
    
    * Add test of new feature to source processors
    
    * Minor changes in variable names and methods
    
    * Change lastSchemaPosition to nextSchemaPosition
    * Refactor setCacheSize to restartCache
    * Change validator for cache size to positive integer (>0)
---
 plc4j/integrations/apache-nifi/README.md           |  30 ++++
 .../apache-nifi/nifi-plc4x-processors/pom.xml      |  15 ++
 .../org/apache/plc4x/nifi/BasePlc4xProcessor.java  |  78 +++++----
 .../org/apache/plc4x/nifi/Plc4xSinkProcessor.java  |  32 +++-
 .../apache/plc4x/nifi/Plc4xSourceProcessor.java    |  33 +++-
 .../plc4x/nifi/Plc4xSourceRecordProcessor.java     |  46 ++++--
 .../nifi/address/AddressesAccessStrategy.java      |  26 +++
 .../plc4x/nifi/address/AddressesAccessUtils.java   |  66 ++++++++
 .../address/DynamicPropertyAccessStrategy.java     |  41 +++++
 .../nifi/address/TextPropertyAccessStrategy.java   |  46 ++++++
 .../nifi/record/Plc4xReadResponseRecordSet.java    |  13 +-
 .../org/apache/plc4x/nifi/record/Plc4xWriter.java  |  12 +-
 .../plc4x/nifi/record/RecordPlc4xWriter.java       |  22 ++-
 .../org/apache/plc4x/nifi/record/SchemaCache.java  | 139 ++++++++++++++++
 .../plc4x/nifi/Plc4xSourceProcessorTest.java       |  31 +++-
 .../plc4x/nifi/Plc4xSourceRecordProcessorTest.java |  33 ++--
 .../apache/plc4x/nifi/record/SchemaCacheTest.java  | 174 +++++++++++++++++++++
 .../apache/plc4x/nifi/util/Plc4xCommonTest.java    |  14 ++
 plc4j/integrations/apache-nifi/pom.xml             |   5 +
 19 files changed, 766 insertions(+), 90 deletions(-)

diff --git a/plc4j/integrations/apache-nifi/README.md 
b/plc4j/integrations/apache-nifi/README.md
index 8721d07131..4be71723df 100644
--- a/plc4j/integrations/apache-nifi/README.md
+++ b/plc4j/integrations/apache-nifi/README.md
@@ -18,6 +18,36 @@ under the License.
 -->
 # PLC4X Apache NiFi Integration
 
+# Common properties
+This applies to all Plc4x processors:
+
+* Address Access Strategy: defines how the processor obtains the PLC 
addresses. It can take 2 values:
+  * **Properties as Addreses:** 
+      For each variable, add a new property to the processor where the 
property name matches the variable name, and the variable value corresponds to 
the address tag. 
+
+    An *example* of these properties for reading values from a S7-1200:
+    - *var1:* *%DB1:DBX0.0:BOOL*
+    - *var2:* *%DB1:DBX0.1:BOOL*
+    - *var3:* *%DB1:DBB01:BYTE*
+    - *var4:* *%DB1:DBW02:WORD*
+    - *var5:* *%DB1:DBW04:INT*
+
+  * **Address Text:**
+    Property *Address Text* must be supplied in JSON format that contains 
variable name and address tag. Expression Language is supported.
+
+    Using the same example as before:
+    - *Address Text*:  
+    ```json
+    {
+      "var1" : "%DB1:DBX0.0:BOOL",
+      "var2" : "%DB1:DBX0.1:BOOL",
+      "var3" : "%DB1:DBB01:BYTE",
+      "var4" : "%DB1:DBW02:WORD",
+      "var5" : "%DB1:DBW04:INT" 
+    }
+    ```
+    If this JSON is in an attribute `plc4x.addresses` it can be accessed with 
*Address Text*=`${plc4x.addresses}`. 
+
 ## Plc4xSinkProcessor
 
 ## Plc4xSourceProcessor
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
index 0c6a391517..2feead4491 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
@@ -66,6 +66,11 @@
                        <artifactId>nifi-utils</artifactId>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-json-utils</artifactId>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.nifi</groupId>
                        <artifactId>nifi-avro-record-utils</artifactId>
@@ -97,6 +102,16 @@
                                </exclusion>
                        </exclusions>
                </dependency>
+               <dependency>
+                       <groupId>com.fasterxml.jackson.core</groupId>
+                       <artifactId>jackson-core</artifactId>
+                       <version>2.14.1</version>
+               </dependency>
+               <dependency>
+                       <groupId>com.fasterxml.jackson.core</groupId>
+                       <artifactId>jackson-databind</artifactId>
+                       <version>2.14.1</version>
+               </dependency>
                <dependency>
                        <groupId>org.apache.nifi</groupId>
                        
<artifactId>nifi-schema-registry-service-api</artifactId>
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index 2a9b05cb8b..158d08a0a5 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -19,9 +19,8 @@
 package org.apache.plc4x.nifi;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -29,22 +28,44 @@ import java.util.Objects;
 import java.util.Set;
 
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.plc4x.java.api.PlcConnectionManager;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.api.PlcDriverManager;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
+import org.apache.plc4x.nifi.address.AddressesAccessStrategy;
+import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.record.SchemaCache;
 
 public abstract class BasePlc4xProcessor extends AbstractProcessor {
 
+    protected List<PropertyDescriptor> properties;
+    protected Set<Relationship> relationships;
+  
+    protected String connectionString;
+    protected Map<String, String> addressMap;
+
+    protected final SchemaCache schemaCache = new SchemaCache(0);
+
+    private final PlcConnectionManager connectionManager = 
CachedPlcConnectionManager.getBuilder().build();
+
+    protected static final List<AllowableValue> addressAccessStrategy = 
Collections.unmodifiableList(Arrays.asList(
+        AddressesAccessUtils.ADDRESS_PROPERTY,
+        AddressesAccessUtils.ADDRESS_TEXT));
+
+
        protected static final PropertyDescriptor PLC_CONNECTION_STRING = new 
PropertyDescriptor
         .Builder().name("PLC_CONNECTION_STRING")
         .displayName("PLC connection String")
@@ -53,6 +74,14 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
         .addValidator(new Plc4xConnectionStringValidator())
         .build();
        
+    public static final PropertyDescriptor PLC_SCHEMA_CACHE_SIZE = new 
PropertyDescriptor.Builder().name("plc4x-record-schema-cache-size")
+        .displayName("Schema Cache Size")
+               .description("Maximum number of entries in the cache. Can 
improve performance when addresses change dynamically.")
+               .defaultValue("1")
+               .required(true)
+               .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+               .build();
+
     protected static final Relationship REL_SUCCESS = new 
Relationship.Builder()
            .name("success")
            .description("Successfully processed")
@@ -64,19 +93,14 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
         .build();
 
 
-    protected List<PropertyDescriptor> properties;
-    protected Set<Relationship> relationships;
-  
-    protected String connectionString;
-    protected Map<String, String> addressMap;
-
-
-    private final PlcConnectionManager connectionManager = 
CachedPlcConnectionManager.getBuilder().build();
-
     @Override
     protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> properties = new ArrayList<>();
+
        properties.add(PLC_CONNECTION_STRING);
+        properties.add(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY);
+        properties.add(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+        properties.add(PLC_SCHEMA_CACHE_SIZE);
         this.properties = Collections.unmodifiableList(properties);
 
        
@@ -86,19 +110,17 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
         this.relationships = Collections.unmodifiableSet(relationships);
     }
 
-    public Map<String, String> getPlcAddress() {
-        return addressMap;
+    public Map<String, String> getPlcAddressMap(ProcessContext context, 
FlowFile flowFile) {
+        AddressesAccessStrategy strategy = 
AddressesAccessUtils.getAccessStrategy(context);
+        return strategy.extractAddresses(context, flowFile);
     }
     
     public String getConnectionString() {
         return connectionString;
     }
 
-    Collection<String> getTags() {
-        return addressMap.keySet();
-    }
-    String getAddress(String tagName) {
-        return addressMap.get(tagName);
+    public SchemaCache getSchemaCache() {
+        return schemaCache;
     }
     
        @Override
@@ -118,6 +140,7 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
                 .name(propertyDescriptorName)
                 .expressionLanguageSupported(ExpressionLanguageScope.NONE)
                 
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .dependsOn(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, 
AddressesAccessUtils.ADDRESS_PROPERTY)
                 .required(false)
                 .dynamic(true)
                 .build();
@@ -127,13 +150,7 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
                connectionString = 
context.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
-               addressMap = new HashMap<>();
-               //variables are passed as dynamic properties
-               
context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic).forEach(
-                               t -> addressMap.put(t.getName(), 
context.getProperty(t.getName()).getValue()));
-               if (addressMap.isEmpty()) {
-                       throw new PlcRuntimeException("No address specified");
-               }       
+        
schemaCache.restartCache(context.getProperty(PLC_SCHEMA_CACHE_SIZE).asInteger());
     }
 
     @Override
@@ -150,13 +167,12 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
         BasePlc4xProcessor that = (BasePlc4xProcessor) o;
         return Objects.equals(properties, that.properties) &&
             Objects.equals(getRelationships(), that.getRelationships()) &&
-            Objects.equals(getConnectionString(), that.getConnectionString()) 
&&
-            Objects.equals(addressMap, that.addressMap);
+            Objects.equals(getConnectionString(), that.getConnectionString());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), properties, getRelationships(), 
getConnectionString(), addressMap);
+        return Objects.hash(super.hashCode(), properties, getRelationships(), 
getConnectionString());
     }
 
     public static class Plc4xConnectionStringValidator implements Validator {
@@ -179,4 +195,8 @@ public abstract class BasePlc4xProcessor extends 
AbstractProcessor {
         return connectionManager;
     }
 
+    protected PlcDriver getDriver() throws PlcConnectionException {
+        return PlcDriverManager.getDefault().getDriverForUrl(connectionString);
+    }
+
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
index 970f693cea..a13e6c3dff 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.plc4x.nifi;
 
+import java.util.Map;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.ReadsAttributes;
@@ -31,6 +33,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.model.PlcTag;
 
 @TriggerSerially
 @Tags({"plc4x-sink"})
@@ -56,13 +59,20 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
 
             // Prepare the request.
             PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
-            flowFile.getAttributes().forEach((tag, value) -> {
-                String address = getAddress(tag);
-                if (address != null) {
-                    // TODO: Convert the String into the right type ...
-                    builder.addTagAddress(tag, address, 
Boolean.valueOf(value));
+            Map<String,String> addressMap = getPlcAddressMap(context, 
flowFile);
+            final Map<String, PlcTag> tags = 
getSchemaCache().retrieveTags(addressMap);
+
+            if (tags != null){
+                for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
+                    builder.addTag(tag.getKey(), tag.getValue());
+                }
+            } else {
+                getLogger().debug("PlcTypes resolution not found in cache and 
will be added with key: " + addressMap.toString());
+                for (Map.Entry<String,String> entry: addressMap.entrySet()){
+                    builder.addTagAddress(entry.getKey(), entry.getValue());
                 }
-            });
+            }
+           
             PlcWriteRequest writeRequest = builder.build();
 
             // Send the request to the PLC.
@@ -70,6 +80,16 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
                 final PlcWriteResponse plcWriteResponse = 
writeRequest.execute().get();
                 // TODO: Evaluate the response and create flow files for 
successful and unsuccessful updates
                 session.transfer(flowFile, REL_SUCCESS);
+
+                if (tags == null){
+                    getLogger().debug("Adding PlcTypes resolution into cache 
with key: " + addressMap.toString());
+                    getSchemaCache().addSchema(
+                        addressMap, 
+                        writeRequest.getTagNames(),
+                        writeRequest.getTags(),
+                        null
+                    );
+                }
             } catch (Exception e) {
                 flowFile = session.putAttribute(flowFile, "exception", 
e.getLocalizedMessage());
                 session.transfer(flowFile, REL_FAILURE);
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index b10cd671b8..1a88aa4721 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -32,8 +32,10 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcDriver;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.model.PlcTag;
 
 @Tags({"plc4x-source"})
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@@ -54,12 +56,20 @@ public class Plc4xSourceProcessor extends 
BasePlc4xProcessor {
             FlowFile flowFile = session.create();
             try {
                 PlcReadRequest.Builder builder = 
connection.readRequestBuilder();
-                getTags().forEach(tag -> {
-                    String address = getAddress(tag);
-                    if (address != null) {
-                        builder.addTagAddress(tag, address);
+                Map<String,String> addressMap = getPlcAddressMap(context, 
flowFile);
+                final Map<String, PlcTag> tags = 
getSchemaCache().retrieveTags(addressMap);
+
+                if (tags != null){
+                    for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
+                        builder.addTag(tag.getKey(), tag.getValue());
+                    }
+                } else {
+                    getLogger().debug("PlcTypes resolution not found in cache 
and will be added with key: " + addressMap.toString());
+                    for (Map.Entry<String,String> entry: 
addressMap.entrySet()){
+                        builder.addTagAddress(entry.getKey(), 
entry.getValue());
                     }
-                });
+                }
+
                 PlcReadRequest readRequest = builder.build();
                 PlcReadResponse response = readRequest.execute().get();
                 Map<String, String> attributes = new HashMap<>();
@@ -69,7 +79,18 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor 
{
                         attributes.put(tagName, String.valueOf(value));
                     }
                 }
-                flowFile = session.putAllAttributes(flowFile, attributes);   
+                flowFile = session.putAllAttributes(flowFile, attributes); 
+                
+                if (tags == null){
+                    getLogger().debug("Adding PlcTypes resolution into cache 
with key: " + addressMap.toString());
+                    getSchemaCache().addSchema(
+                        addressMap, 
+                        readRequest.getTagNames(),
+                        readRequest.getTags(),
+                        null
+                    );
+                }
+
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new ProcessException(e);
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
index 6ac08491ba..1af45e6bae 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
@@ -46,12 +46,13 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.StopWatch;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.model.PlcTag;
 import org.apache.plc4x.nifi.record.Plc4xWriter;
 import org.apache.plc4x.nifi.record.RecordPlc4xWriter;
 
@@ -103,21 +104,15 @@ public class Plc4xSourceRecordProcessor extends 
BasePlc4xProcessor {
        @OnScheduled
        @Override
        public void onScheduled(final ProcessContext context) {
+               super.onScheduled(context);
         super.connectionString = 
context.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
         this.readTimeout = 
context.getProperty(PLC_READ_FUTURE_TIMEOUT_MILISECONDS.getName()).asInteger();
-               addressMap = new HashMap<>();
-               //variables are passed as dynamic properties
-               
context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic).forEach(
-                               t -> addressMap.put(t.getName(), 
context.getProperty(t.getName()).getValue()));
-               if (addressMap.isEmpty()) {
-                       throw new PlcRuntimeException("No address specified");
-               }       
        }
        
        @Override
        public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
                FlowFile fileToProcess = null;
-               // TODO: In the future the processor will be configurable to 
get the address and the connection from incoming flowfile
+               // TODO: In the future the processor will be configurable to 
get the connection from incoming flowfile
                if (context.hasIncomingConnection()) {
                        fileToProcess = session.get();
                        // If we have no FlowFile, and all incoming connections 
are self-loops then we
@@ -151,12 +146,21 @@ public class Plc4xSourceRecordProcessor extends 
BasePlc4xProcessor {
                        }
 
                        PlcReadRequest.Builder builder = 
connection.readRequestBuilder();
-                       getTags().forEach(tagName -> {
-                               String address = getAddress(tagName);
-                               if (address != null) {
-                                       builder.addTagAddress(tagName, address);
+                       Map<String,String> addressMap = 
getPlcAddressMap(context, fileToProcess);
+                       final RecordSchema recordSchema = 
getSchemaCache().retrieveSchema(addressMap);
+                       final Map<String, PlcTag> tags = 
getSchemaCache().retrieveTags(addressMap);
+
+                       if (tags != null){
+                               for (Map.Entry<String,PlcTag> tag : 
tags.entrySet()){
+                                       builder.addTag(tag.getKey(), 
tag.getValue());
                                }
-                       });
+                       } else {
+                               logger.debug("Plc-Avro schema and PlcTypes 
resolution not found in cache and will be added with key: " + 
addressMap.toString());
+                               for (Map.Entry<String,String> entry: 
addressMap.entrySet()){
+                                       builder.addTagAddress(entry.getKey(), 
entry.getValue());
+                               }
+                       }
+            
                        PlcReadRequest readRequest = builder.build();
                        final FlowFile originalFlowFile = fileToProcess;
                        resultSetFF = session.write(resultSetFF, out -> {
@@ -164,9 +168,9 @@ public class Plc4xSourceRecordProcessor extends 
BasePlc4xProcessor {
                                        PlcReadResponse readResponse = 
readRequest.execute().get(this.readTimeout, TimeUnit.MILLISECONDS);
                                        
                                        if(originalFlowFile == null) //there is 
no inherit attributes to use in writer service 
-                                               
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null));
+                                               
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null, 
recordSchema));
                                        else 
-                                               
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null, 
originalFlowFile));
+                                               
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null, 
recordSchema, originalFlowFile));
                                } catch (InterruptedException e) {
                                        logger.error("InterruptedException 
reading the data from PLC", e);
                            Thread.currentThread().interrupt();
@@ -179,6 +183,16 @@ public class Plc4xSourceRecordProcessor extends 
BasePlc4xProcessor {
                                        throw (e instanceof ProcessException) ? 
(ProcessException) e : new ProcessException(e);
                                }
                        });
+
+                       if (recordSchema == null){
+                               logger.debug("Adding Plc-Avro schema and 
PlcTypes resolution into cache with key: " + addressMap.toString());
+                               getSchemaCache().addSchema(
+                                       addressMap, 
+                                       readRequest.getTagNames(),
+                                       readRequest.getTags(),
+                                       plc4xWriter.getRecordSchema()
+                               );
+                       }
                        long executionTimeElapsed = 
executeTime.getElapsed(TimeUnit.MILLISECONDS);
                        final Map<String, String> attributesToAdd = new 
HashMap<>();
                        attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
new file mode 100644
index 0000000000..31b4f429ff
--- /dev/null
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.nifi.address;
+
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+
+public interface AddressesAccessStrategy {
+    public Map<String,String> extractAddresses(final ProcessContext context, 
final FlowFile flowFile);
+}
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
new file mode 100644
index 0000000000..b24af06e21
--- /dev/null
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.JsonValidator;
+
+public class AddressesAccessUtils {
+    public static final AllowableValue ADDRESS_PROPERTY = new AllowableValue(
+            "property-address",
+            "Use Properties as Addresses",
+            "Each property will be treated as tag-address pairs after 
Expression Language is evaluated.");
+
+    public static final AllowableValue ADDRESS_TEXT = new AllowableValue(
+            "text-address",
+            "Use 'Address Text' Property",
+            "Addresses will be obtained from 'Address Text' Property. It's 
content must be a valid JSON " +
+                    "after Expression Language is evaluated. ");
+
+    public static final PropertyDescriptor PLC_ADDRESS_ACCESS_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("plc4x-address-access-strategy")
+            .displayName("Address Access Strategy")
+            .description("Strategy used to obtain the PLC addresses")
+            .allowableValues(ADDRESS_PROPERTY, ADDRESS_TEXT)
+            .defaultValue(ADDRESS_PROPERTY.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor ADDRESS_TEXT_PROPERTY = new 
PropertyDescriptor.Builder()
+            .name("text-address-property")
+            .displayName("Address Text")
+            .description("Must contain a valid JSON object after Expression 
Language is evaluated. "
+                    + "Each field-value is treated as tag-address.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(new JsonValidator())
+            .dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_TEXT)
+            .required(true)
+            .build();
+
+    public static AddressesAccessStrategy getAccessStrategy(final 
ProcessContext context) {
+        String value = 
context.getProperty(PLC_ADDRESS_ACCESS_STRATEGY).getValue();
+        if (ADDRESS_PROPERTY.getValue().equalsIgnoreCase(value))
+            return new DynamicPropertyAccessStrategy();
+        else if (ADDRESS_TEXT.getValue().equalsIgnoreCase(value))
+            return new TextPropertyAccessStrategy();
+        return null;
+    }
+}
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
new file mode 100644
index 0000000000..2fe6367ccf
--- /dev/null
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+
+public class DynamicPropertyAccessStrategy implements AddressesAccessStrategy{
+
+    private Map<String,String> extractAddressesFromAttributes(final 
ProcessContext context, final FlowFile flowFile) {
+        Map<String,String> addressMap = new HashMap<>();
+
+        
context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic).forEach(
+            t -> addressMap.put(t.getName(), 
context.getProperty(t.getName()).evaluateAttributeExpressions(flowFile).getValue()));
+        
+        return addressMap; 
+    }
+
+    public Map<String, String> extractAddresses(final ProcessContext context, 
final FlowFile flowFile) {
+        return extractAddressesFromAttributes(context, flowFile);
+    }
+}
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
new file mode 100644
index 0000000000..f172eada6c
--- /dev/null
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class TextPropertyAccessStrategy implements AddressesAccessStrategy{
+    private Map<String,String> extractAddressesFromText(String input) throws 
JsonMappingException, JsonProcessingException {
+        ObjectMapper mapper = new ObjectMapper();
+
+        return mapper.readValue(input, Map.class);
+    }
+
+    @Override
+    public Map<String, String> extractAddresses(final ProcessContext context, 
final FlowFile flowFile) throws ProcessException{
+        try {
+            return 
extractAddressesFromText(context.getProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY).evaluateAttributeExpressions(flowFile).getValue());
+        } catch (Exception e) {
+            throw new ProcessException(e.toString());
+        }
+        
+    }
+}
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
index 975d48e314..3e038c0c50 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
@@ -45,11 +45,9 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
     private final Set<String> rsColumnNames;
     private boolean moreRows;
 
-    // TODO: review this AtomicReference?
-       // TODO: this could be enhanced checking if record schema should be 
updated (via a cache boolean, checking property values is a nifi expression 
language, etc)
-       private AtomicReference<RecordSchema> recordSchema;
+       private AtomicReference<RecordSchema> recordSchema = new 
AtomicReference<RecordSchema>(null);
 
-    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse) 
throws IOException {
+    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, 
RecordSchema recordSchema) throws IOException {
         this.readResponse = readResponse;
         moreRows = true;
         
@@ -58,9 +56,10 @@ public class Plc4xReadResponseRecordSet implements 
RecordSet, Closeable {
         rsColumnNames = responseDataStructure.keySet();
                
         if (recordSchema == null) {
-               Schema avroSchema = 
Plc4xCommon.createSchema(responseDataStructure); //TODO: review this method as 
it is the 'mapping' from PlcValues to avro datatypes         
-               recordSchema = new AtomicReference<RecordSchema>();
-               recordSchema.set(AvroTypeUtil.createSchema(avroSchema));
+               Schema avroSchema = 
Plc4xCommon.createSchema(responseDataStructure);            
+               this.recordSchema.set(AvroTypeUtil.createSchema(avroSchema));
+        } else {
+            this.recordSchema.set(recordSchema);
         }
         logger.debug("Record schema from PlcReadResponse successfuly 
created.");
 
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
index bef85f4205..a950ac3735 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 
 public interface Plc4xWriter {
@@ -38,8 +39,8 @@ public interface Plc4xWriter {
      * @return the number of rows written to the output stream
      * @throws Exception if any errors occur during the writing of the result 
set to the output stream
      */
-    long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback) 
throws Exception;
-    long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, 
FlowFile originalFlowFile) throws Exception;
+    long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, 
RecordSchema recordSchema) throws Exception;
+    long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, 
RecordSchema recordSchema, FlowFile originalFlowFile) throws Exception;
 
     /**
      * Returns a map of attribute key/value pairs to be added to any outgoing 
flow file(s). The default implementation is to return an empty map.
@@ -72,4 +73,11 @@ public interface Plc4xWriter {
      * @return the MIME type string of the output format.
      */
     String getMimeType();
+
+
+    /**
+     * Returns the Record Schema used to create the output flowfiles. Used to 
store in schema cache.
+     * @return the Record Schema of the output.
+     */
+    RecordSchema getRecordSchema();
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
index 06659efcf0..f2535604c1 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
@@ -54,9 +54,9 @@ public class RecordPlc4xWriter implements Plc4xWriter {
        }
 
        @Override
-       public long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback) 
throws Exception {
+       public long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback, 
RecordSchema recordSchema) throws Exception {
                if (fullRecordSet == null) {
-            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, callback);
+            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema);
             writeSchema = recordSetWriterFactory.getSchema(originalAttributes, 
fullRecordSet.getSchema());
         }
                Map<String, String> empty = new HashMap<>();
@@ -72,9 +72,9 @@ public class RecordPlc4xWriter implements Plc4xWriter {
        }
        
        @Override
-       public long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback, 
FlowFile originalFlowFile) throws Exception {
-               if (fullRecordSet == null) {    
-            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, callback);
+       public long writePlcReadResponse(PlcReadResponse response, OutputStream 
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback, 
RecordSchema recordSchema, FlowFile originalFlowFile) throws Exception {
+        if (fullRecordSet == null) {   
+            fullRecordSet = new 
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema);
             writeSchema = recordSetWriterFactory.getSchema(originalAttributes, 
fullRecordSet.getSchema());
          }
         try (final RecordSetWriter resultSetWriter = 
recordSetWriterFactory.createWriter(logger, writeSchema, outputStream, 
originalFlowFile)) {
@@ -142,8 +142,8 @@ public class RecordPlc4xWriter implements Plc4xWriter {
        
        private static class Plc4xReadResponseRecordSetWithCallback extends 
Plc4xReadResponseRecordSet {
         private final Plc4xReadResponseRowCallback callback;
-        public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse 
readResponse, Plc4xReadResponseRowCallback callback) throws IOException {
-            super(readResponse);
+        public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse 
readResponse, Plc4xReadResponseRowCallback callback, RecordSchema recordSchema) 
throws IOException {
+            super(readResponse, recordSchema);
             this.callback = callback;
         }
         @Override
@@ -162,4 +162,12 @@ public class RecordPlc4xWriter implements Plc4xWriter {
         }
        }
 
+    public RecordSchema getRecordSchema(){
+        try {
+            return this.fullRecordSet.getSchema();
+        } catch (IOException e){
+            return null;
+        }
+    }
+
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
new file mode 100644
index 0000000000..f925aceea0
--- /dev/null
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.nifi.record;
+
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.plc4x.java.api.model.PlcTag;
+
+public class SchemaCache {
+    private ConcurrentMap<String, SchemaContainer> schemaMap = new 
ConcurrentHashMap<>();
+    private AtomicReferenceArray<String> schemaAppendOrder = new 
AtomicReferenceArray<>(0);
+    private final AtomicInteger nextSchemaPosition = new AtomicInteger(0);
+    private final AtomicInteger cacheSize = new AtomicInteger(0);
+
+    /** Creates a schema cache with first-in-first-out replacement policy. 
Stores PlcTags and RecordSchema used for PlcResponse serialization
+     * @param cacheSize initial cache size
+     */
+    public SchemaCache(int cacheSize) {
+        this.cacheSize.set(cacheSize);
+    }
+
+    /** Empties and restart the cache with the given size
+     * @param cacheSize size of schema cache
+     */
+    public void restartCache(int cacheSize) {
+        this.cacheSize.set(cacheSize);
+        this.schemaAppendOrder = new AtomicReferenceArray<>(cacheSize);
+        this.schemaMap = new ConcurrentHashMap<>();
+        this.nextSchemaPosition.set(0);
+    }
+
+
+    /** Adds the schema to the cache if not present. When the cache is full 
first-in-first-out replacement policy applies
+     * @param schemaIdentifier tagName-address map used to store the schema 
+     * @param tagsNames list of tag names
+     * @param tagsList list of PlcTag's
+     * @param schema record schema used for PlcResponse serialization. Can be 
null
+     */
+    public void addSchema(final Map<String,String> schemaIdentifier, final 
LinkedHashSet<String> tagsNames, final List<PlcTag> tagsList,  final 
RecordSchema schema) {        
+        if (!schemaMap.containsKey(schemaIdentifier.toString())){
+            if (nextSchemaPosition.get() == cacheSize.get()){
+                nextSchemaPosition.set(0);
+            }
+            removeSchema(schemaAppendOrder.get(nextSchemaPosition.get()));
+
+            Map<String, PlcTag> tags = new HashMap<>();
+            for (int i=0; i<tagsNames.size(); i++){
+                tags.put(tagsNames.toArray(new String[]{})[i], 
tagsList.get(i));
+            }
+            schemaMap.put(schemaIdentifier.toString(), new 
SchemaContainer(tags, schema));
+            schemaAppendOrder.set(nextSchemaPosition.get(), 
schemaIdentifier.toString());
+            nextSchemaPosition.getAndAdd(1);
+        }    
+    }
+
+    /** Removes the schema from the cache
+     * @param schemaIdentifier tagName-address map used to store the schema 
+     */
+    public void removeSchema(final String schemaIdentifier) {
+        if (schemaIdentifier == null)
+            return;
+        if (schemaMap.containsKey(schemaIdentifier)){
+            schemaMap.remove(schemaIdentifier);
+        }
+    }
+
+
+    /** Retrieves a schema from the cache if found
+     * @param schemaIdentifier tagName-address map used to store the schema 
+     * @return RecordSchema used for PlcResponse serialization. Null if not 
found
+     */
+    public RecordSchema retrieveSchema(final Map<String,String> 
schemaIdentifier) { 
+        if (schemaMap.containsKey(schemaIdentifier.toString())){
+            return schemaMap.get(schemaIdentifier.toString()).getSchema();
+        }
+        return null;
+    }
+
+    /** Retrieves tags from the cache if found
+     * @param schemaIdentifier tagName-address map used to store the schema 
+     * @return Map between tag names and the corresponding PlcTag. Null if not 
found
+     */
+    public Map<String, PlcTag> retrieveTags(final Map<String,String> 
schemaIdentifier) { 
+        if (schemaMap.containsKey(schemaIdentifier.toString())){
+            return schemaMap.get(schemaIdentifier.toString()).getTags();
+        }
+        return null;
+    }
+
+    protected int getNextSchemaPosition() {
+        return nextSchemaPosition.get();
+    }
+
+    protected int getCacheSize() {
+        return cacheSize.get();
+    }
+
+    public class SchemaContainer {
+        private RecordSchema schema;
+        private Map<String, PlcTag> tags;
+
+        public Map<String, PlcTag> getTags() {
+            return tags;
+        }
+
+        public RecordSchema getSchema() {
+            return schema;
+        }
+
+        SchemaContainer(Map<String, PlcTag> tags, RecordSchema schema){
+            this.tags = tags;
+            this.schema = schema;
+        }
+    }
+}
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
index f713f49d6e..bf742a15cf 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
@@ -20,22 +20,49 @@ package org.apache.plc4x.nifi;
 
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.apache.plc4x.nifi.Plc4xSourceProcessor;
+import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.util.Plc4xCommonTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 public class Plc4xSourceProcessorTest {
 
     private TestRunner testRunner;
+    private static int NUMBER_OF_CALLS = 5;
 
     @BeforeEach
     public void init() {
         testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        testRunner.setIncomingConnection(false);
+        testRunner.setValidateExpressionUsage(false);
+
+        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, 
"simulated://127.0.0.1");
+
+        testRunner.addConnection(Plc4xSourceProcessor.REL_SUCCESS);
+        testRunner.addConnection(Plc4xSourceProcessor.REL_FAILURE);
     }
 
-    @Test
     public void testProcessor() {
 
+        testRunner.run(NUMBER_OF_CALLS);
+        testRunner.assertTransferCount(Plc4xSourceProcessor.REL_FAILURE, 0);
+        testRunner.assertTransferCount(Plc4xSourceProcessor.REL_SUCCESS, 
NUMBER_OF_CALLS);
+    }
+
+    // Test dynamic properties addressess access strategy
+    @Test
+    public void testWithAddressProperties() {
+        
testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, 
AddressesAccessUtils.ADDRESS_PROPERTY);
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> 
testRunner.setProperty(k, v));
+        testProcessor();
+    }
+
+    // Test addressess text property access strategy
+    @Test
+    public void testWithAddressText() { 
+        
testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, 
AddressesAccessUtils.ADDRESS_TEXT);
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, 
Plc4xCommonTest.getAddressMap().toString());
+        testProcessor();
     }
 
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
index e508318571..434d22a472 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
@@ -18,12 +18,11 @@
  */
 package org.apache.plc4x.nifi;
 
-import java.util.Map;
-
 import org.apache.nifi.avro.AvroRecordSetWriter;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.apache.plc4x.nifi.address.AddressesAccessUtils;
 import org.apache.plc4x.nifi.util.Plc4xCommonTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,6 +42,7 @@ public class Plc4xSourceRecordProcessorTest {
 
        
testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_READ_FUTURE_TIMEOUT_MILISECONDS,
 "100");
        
testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_CONNECTION_STRING, 
"simulated://127.0.0.1");
+               
testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_SCHEMA_CACHE_SIZE, "1");
 
        testRunner.addConnection(Plc4xSourceRecordProcessor.REL_SUCCESS);
        testRunner.addConnection(Plc4xSourceRecordProcessor.REL_FAILURE);
@@ -50,22 +50,9 @@ public class Plc4xSourceRecordProcessorTest {
                testRunner.addControllerService("writer", writerService);
        testRunner.enableControllerService(writerService);
                
testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_RECORD_WRITER_FACTORY.getName(),
 "writer");
-
-               for (Map.Entry<String,String> address 
:Plc4xCommonTest.addressMap.entrySet()) {
-                       // TODO: Random generation not working with this types
-                       if (address.getValue().startsWith("RANDOM/")) {
-                               if (address.getValue().endsWith("BYTE") ||
-                                       address.getValue().endsWith("CHAR") ||
-                                       address.getValue().endsWith("STRING"))
-                                       continue;
-                       }
-                       testRunner.setProperty(address.getKey(), 
address.getValue());
-               }
     }
 
-    @Test
     public void testAvroRecordWriterProcessor() throws InitializationException 
{       
-       
        testRunner.run(NUMBER_OF_CALLS,true, true);
        //validations
        testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_FAILURE, 
0);
@@ -73,4 +60,20 @@ public class Plc4xSourceRecordProcessorTest {
 
                
Plc4xCommonTest.assertAvroContent(testRunner.getFlowFilesForRelationship(Plc4xSourceProcessor.REL_SUCCESS),
 false, true);
     }
+
+       // Test dynamic properties addressess access strategy
+       @Test
+    public void testWithAddressProperties() throws InitializationException {
+        
testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, 
AddressesAccessUtils.ADDRESS_PROPERTY);
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> 
testRunner.setProperty(k, v));
+        testAvroRecordWriterProcessor();
+    }
+
+       // Test addressess text property access strategy
+    @Test
+    public void testWithAddressText() throws InitializationException { 
+        
testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, 
AddressesAccessUtils.ADDRESS_TEXT);
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, 
Plc4xCommonTest.getAddressMap().toString());
+        testAvroRecordWriterProcessor();
+    }
 }
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/record/SchemaCacheTest.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/record/SchemaCacheTest.java
new file mode 100644
index 0000000000..e1d641cefc
--- /dev/null
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/record/SchemaCacheTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.record;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.SchemaBuilder.FieldAssembler;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.plc4x.java.api.model.PlcTag;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SchemaCacheTest {
+
+    private static SchemaCache schemaCache = new SchemaCache(0);
+    private static List<RecordSchema> schemas = new ArrayList<>();
+    private static List<List<PlcTag>> tags = new ArrayList<>();
+    private static List<Map<String, String>> addresses = new ArrayList<>();
+    private static List<LinkedHashSet<String>> tagNames = new ArrayList<>();
+
+    @BeforeAll
+    static public void innit() {
+        // Create 5 unique Schemas
+
+        for (int i = 0; i < 5; i++) {
+            FieldAssembler<Schema> fieldAssembler = 
SchemaBuilder.builder().record("foo").fields();
+            Collection<String> collection = new HashSet<>();
+            List<PlcTag> innerTags = new ArrayList<>();
+            String address = null;
+
+            switch (i) {
+                case 0:
+                    address = "RANDOM/fooBOOL:BOOL";
+                    addresses.add(Map.of("BOOL", address));
+                    innerTags.add(new TestPlcTag(address));
+                    tags.add(innerTags);
+                    fieldAssembler.nullableBoolean("BOOL", false);
+                    collection.add("fooBOOL");
+                    tagNames.add(new LinkedHashSet<>(collection));
+                    break;
+                case 1:
+                    address = "RANDOM/fooINT:INT";
+                    addresses.add(Map.of("INT", address));
+                    innerTags.add(new TestPlcTag(address));
+                    tags.add(innerTags);
+                    fieldAssembler.nullableInt("INT", 1);
+                    collection.add("fooINT");
+                    tagNames.add(new LinkedHashSet<>(collection));
+                    break;
+                case 2:
+                    address = "RANDOM/fooLONG:LONG";
+                    addresses.add(Map.of("LONG", address));
+                    innerTags.add(new TestPlcTag(address));
+                    tags.add(innerTags);
+                    fieldAssembler.nullableLong("LONG", i * 100);
+                    collection.add("fooLONG");
+                    tagNames.add(new LinkedHashSet<>(collection));
+                    break;
+                case 3:
+                    address = "RANDOM/fooFLOAT:FLOAT";
+                    addresses.add(Map.of("FLOAT", address));
+                    innerTags.add(new TestPlcTag(address));
+                    tags.add(innerTags);
+                    fieldAssembler.nullableFloat("FLOAT", i * 0.1F);
+                    collection.add("fooFLOAT");
+                    tagNames.add(new LinkedHashSet<>(collection));
+                    break;
+                case 4:
+                    address = "RANDOM/fooDOUBLE:DOUBLE";
+                    addresses.add(Map.of("DOUBLE", address));
+                    innerTags.add(new TestPlcTag(address));
+                    tags.add(innerTags);
+                    fieldAssembler.nullableDouble("DOUBLE", i * 0.01);
+                    collection.add("fooDOUBLE");
+                    tagNames.add(new LinkedHashSet<>(collection));
+                    break;
+            }
+            Schema avroSchema = fieldAssembler.endRecord();
+            schemas.add(AvroTypeUtil.createSchema(avroSchema));
+        }
+    }
+
+    // Cache size set to 4 < number of schemas: to check schema override.
+    @BeforeEach
+    public void testCacheSize() {
+        schemaCache.restartCache(4);
+        assert schemaCache.getCacheSize() == 4;
+        assert schemaCache.getNextSchemaPosition() == 0;
+    }
+
+    // In this test we add 4 schemas and try to add schema 0 again. It should 
not be added.
+    @Test
+    public void testAddSchema() {
+        for (int i = 0; i < 4; i++) {
+            schemaCache.addSchema(addresses.get(i), tagNames.get(i), 
tags.get(i), schemas.get(i));
+            assert schemaCache.getNextSchemaPosition() == i + 1;
+        }
+        int prev = schemaCache.getNextSchemaPosition();
+        schemaCache.addSchema(addresses.get(0), tagNames.get(0), tags.get(0), 
schemas.get(0));
+        assert prev == schemaCache.getNextSchemaPosition();
+    }
+
+    // In this test check schema overriding
+    @Test
+    public void testSchemaOverride() {
+        for (int i = 0; i < 4; i++) {
+            schemaCache.addSchema(addresses.get(i), tagNames.get(i), 
tags.get(i), schemas.get(i));
+            assert schemaCache.getNextSchemaPosition() == i + 1;
+        }
+        // Override first schema
+        schemaCache.addSchema(addresses.get(4), tagNames.get(4), tags.get(4), 
schemas.get(4));
+        assert schemaCache.getNextSchemaPosition() == 1;
+
+        // First schema should not be present in the cache
+        assert schemaCache.retrieveSchema(addresses.get(0)) == null;
+
+        // Check remaining schemas
+        for (int i=1; i<5; i++){
+            assert schemaCache.retrieveSchema(addresses.get(i)) == 
schemas.get(i);
+        }
+    }
+
+
+    public static void main(String[] args) {
+        SchemaCacheTest.innit();
+
+        SchemaCacheTest instance = new SchemaCacheTest();
+
+        instance.testCacheSize();
+        instance.testAddSchema();
+
+        instance.testCacheSize();
+        instance.testSchemaOverride();
+    }
+
+
+    private static class TestPlcTag implements PlcTag {
+
+        String address;
+
+        TestPlcTag(String address) {
+            this.address = address;
+        }
+
+        @Override
+        public String getAddressString() {
+            return address;
+        }
+    }
+}
diff --git 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java
 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java
index 425bd0b30e..eb532ea268 100644
--- 
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java
+++ 
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java
@@ -97,6 +97,20 @@ public class Plc4xCommonTest {
         typeMap.put("STRING", Utf8.class);
     }
 
+    public static Map<String, String> getAddressMap(){
+        Map<String, String> result = new HashMap<>();
+
+        addressMap.forEach((k,v) -> {
+                       if (!v.startsWith("RANDOM/")) {
+                               if (!v.endsWith("BYTE") &&
+                                       !v.endsWith("CHAR") &&
+                                       !v.endsWith("STRING"))
+                                       result.put(k, v);
+                       }
+               });
+        return result;
+    }
+
     public static void assertAvroContent(List<MockFlowFile> flowfiles, boolean 
checkValue, boolean checkType) {
         flowfiles.forEach(new Consumer<MockFlowFile>() {
             @Override
diff --git a/plc4j/integrations/apache-nifi/pom.xml 
b/plc4j/integrations/apache-nifi/pom.xml
index 2f4fd33e0f..65b2f74a11 100644
--- a/plc4j/integrations/apache-nifi/pom.xml
+++ b/plc4j/integrations/apache-nifi/pom.xml
@@ -111,6 +111,11 @@
         <artifactId>nifi-avro-record-utils</artifactId>
         <version>${nifi.version}</version>
       </dependency>
+         <dependency>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-json-utils</artifactId>
+               <version>${nifi.version}</version>
+               </dependency>
       <dependency>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-record-serialization-services</artifactId>

Reply via email to