This is an automated email from the ASF dual-hosted git repository.
otto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 0fe3bb518b Feature/nifi integration address text (#755)
0fe3bb518b is described below
commit 0fe3bb518b9a987a4717df45600c51332e347e84
Author: Unai LerĂa Fortea <[email protected]>
AuthorDate: Tue Feb 7 19:32:55 2023 +0100
Feature/nifi integration address text (#755)
* Add Address Text property and selector
* Update Readme
* Added description to acceptable values in Address Access Strategy
* Change AddressAccessStrategies to abstract
* Separate AddressesAccessStrategies to individual files
* Simplify properties initialization
* Add jackson dependencies
* Add first record schema cache iteration
* Added an Avro RecordSchema cache with FIFO replacement policy
* Add license to SchemaCache
* Add PlcTags into SchemaCache
* Remove invalid TODO's
* Add SchemaCacheTest
* Test addSchema
* Test first in first out schema override in addSchema
* Add SchemaCache documentation
* Add logging of cache misses and additions into DEBUG mode/level
* Add test of new feature to source processors
* Minor changes in variable names and methods
* Change lastSchemaPosition to nextSchemaPosition
* Refactor setCacheSize to restartCache
* Change validator for cache size to positive integer (>0)
---
plc4j/integrations/apache-nifi/README.md | 30 ++++
.../apache-nifi/nifi-plc4x-processors/pom.xml | 15 ++
.../org/apache/plc4x/nifi/BasePlc4xProcessor.java | 78 +++++----
.../org/apache/plc4x/nifi/Plc4xSinkProcessor.java | 32 +++-
.../apache/plc4x/nifi/Plc4xSourceProcessor.java | 33 +++-
.../plc4x/nifi/Plc4xSourceRecordProcessor.java | 46 ++++--
.../nifi/address/AddressesAccessStrategy.java | 26 +++
.../plc4x/nifi/address/AddressesAccessUtils.java | 66 ++++++++
.../address/DynamicPropertyAccessStrategy.java | 41 +++++
.../nifi/address/TextPropertyAccessStrategy.java | 46 ++++++
.../nifi/record/Plc4xReadResponseRecordSet.java | 13 +-
.../org/apache/plc4x/nifi/record/Plc4xWriter.java | 12 +-
.../plc4x/nifi/record/RecordPlc4xWriter.java | 22 ++-
.../org/apache/plc4x/nifi/record/SchemaCache.java | 139 ++++++++++++++++
.../plc4x/nifi/Plc4xSourceProcessorTest.java | 31 +++-
.../plc4x/nifi/Plc4xSourceRecordProcessorTest.java | 33 ++--
.../apache/plc4x/nifi/record/SchemaCacheTest.java | 174 +++++++++++++++++++++
.../apache/plc4x/nifi/util/Plc4xCommonTest.java | 14 ++
plc4j/integrations/apache-nifi/pom.xml | 5 +
19 files changed, 766 insertions(+), 90 deletions(-)
diff --git a/plc4j/integrations/apache-nifi/README.md
b/plc4j/integrations/apache-nifi/README.md
index 8721d07131..4be71723df 100644
--- a/plc4j/integrations/apache-nifi/README.md
+++ b/plc4j/integrations/apache-nifi/README.md
@@ -18,6 +18,36 @@ under the License.
-->
# PLC4X Apache NiFi Integration
+# Common properties
+This applies to all Plc4x processors:
+
+* Address Access Strategy: defines how the processor obtains the PLC
addresses. It can take 2 values:
+ * **Properties as Addreses:**
+ For each variable, add a new property to the processor where the
property name matches the variable name, and the variable value corresponds to
the address tag.
+
+ An *example* of these properties for reading values from a S7-1200:
+ - *var1:* *%DB1:DBX0.0:BOOL*
+ - *var2:* *%DB1:DBX0.1:BOOL*
+ - *var3:* *%DB1:DBB01:BYTE*
+ - *var4:* *%DB1:DBW02:WORD*
+ - *var5:* *%DB1:DBW04:INT*
+
+ * **Address Text:**
+ Property *Address Text* must be supplied in JSON format that contains
variable name and address tag. Expression Language is supported.
+
+ Using the same example as before:
+ - *Address Text*:
+ ```json
+ {
+ "var1" : "%DB1:DBX0.0:BOOL",
+ "var2" : "%DB1:DBX0.1:BOOL",
+ "var3" : "%DB1:DBB01:BYTE",
+ "var4" : "%DB1:DBW02:WORD",
+ "var5" : "%DB1:DBW04:INT"
+ }
+ ```
+ If this JSON is in an attribute `plc4x.addresses` it can be accessed with
*Address Text*=`${plc4x.addresses}`.
+
## Plc4xSinkProcessor
## Plc4xSourceProcessor
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
index 0c6a391517..2feead4491 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
@@ -66,6 +66,11 @@
<artifactId>nifi-utils</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-json-utils</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
@@ -97,6 +102,16 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>2.14.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.14.1</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index 2a9b05cb8b..158d08a0a5 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -19,9 +19,8 @@
package org.apache.plc4x.nifi;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -29,22 +28,44 @@ import java.util.Objects;
import java.util.Set;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.plc4x.java.api.PlcConnectionManager;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.java.api.PlcDriverManager;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
+import org.apache.plc4x.nifi.address.AddressesAccessStrategy;
+import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.record.SchemaCache;
public abstract class BasePlc4xProcessor extends AbstractProcessor {
+ protected List<PropertyDescriptor> properties;
+ protected Set<Relationship> relationships;
+
+ protected String connectionString;
+ protected Map<String, String> addressMap;
+
+ protected final SchemaCache schemaCache = new SchemaCache(0);
+
+ private final PlcConnectionManager connectionManager =
CachedPlcConnectionManager.getBuilder().build();
+
+ protected static final List<AllowableValue> addressAccessStrategy =
Collections.unmodifiableList(Arrays.asList(
+ AddressesAccessUtils.ADDRESS_PROPERTY,
+ AddressesAccessUtils.ADDRESS_TEXT));
+
+
protected static final PropertyDescriptor PLC_CONNECTION_STRING = new
PropertyDescriptor
.Builder().name("PLC_CONNECTION_STRING")
.displayName("PLC connection String")
@@ -53,6 +74,14 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
.addValidator(new Plc4xConnectionStringValidator())
.build();
+ public static final PropertyDescriptor PLC_SCHEMA_CACHE_SIZE = new
PropertyDescriptor.Builder().name("plc4x-record-schema-cache-size")
+ .displayName("Schema Cache Size")
+ .description("Maximum number of entries in the cache. Can
improve performance when addresses change dynamically.")
+ .defaultValue("1")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
protected static final Relationship REL_SUCCESS = new
Relationship.Builder()
.name("success")
.description("Successfully processed")
@@ -64,19 +93,14 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
.build();
- protected List<PropertyDescriptor> properties;
- protected Set<Relationship> relationships;
-
- protected String connectionString;
- protected Map<String, String> addressMap;
-
-
- private final PlcConnectionManager connectionManager =
CachedPlcConnectionManager.getBuilder().build();
-
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
+
properties.add(PLC_CONNECTION_STRING);
+ properties.add(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY);
+ properties.add(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+ properties.add(PLC_SCHEMA_CACHE_SIZE);
this.properties = Collections.unmodifiableList(properties);
@@ -86,19 +110,17 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
this.relationships = Collections.unmodifiableSet(relationships);
}
- public Map<String, String> getPlcAddress() {
- return addressMap;
+ public Map<String, String> getPlcAddressMap(ProcessContext context,
FlowFile flowFile) {
+ AddressesAccessStrategy strategy =
AddressesAccessUtils.getAccessStrategy(context);
+ return strategy.extractAddresses(context, flowFile);
}
public String getConnectionString() {
return connectionString;
}
- Collection<String> getTags() {
- return addressMap.keySet();
- }
- String getAddress(String tagName) {
- return addressMap.get(tagName);
+ public SchemaCache getSchemaCache() {
+ return schemaCache;
}
@Override
@@ -118,6 +140,7 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
.name(propertyDescriptorName)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .dependsOn(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY,
AddressesAccessUtils.ADDRESS_PROPERTY)
.required(false)
.dynamic(true)
.build();
@@ -127,13 +150,7 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
connectionString =
context.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
- addressMap = new HashMap<>();
- //variables are passed as dynamic properties
-
context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic).forEach(
- t -> addressMap.put(t.getName(),
context.getProperty(t.getName()).getValue()));
- if (addressMap.isEmpty()) {
- throw new PlcRuntimeException("No address specified");
- }
+
schemaCache.restartCache(context.getProperty(PLC_SCHEMA_CACHE_SIZE).asInteger());
}
@Override
@@ -150,13 +167,12 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
BasePlc4xProcessor that = (BasePlc4xProcessor) o;
return Objects.equals(properties, that.properties) &&
Objects.equals(getRelationships(), that.getRelationships()) &&
- Objects.equals(getConnectionString(), that.getConnectionString())
&&
- Objects.equals(addressMap, that.addressMap);
+ Objects.equals(getConnectionString(), that.getConnectionString());
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), properties, getRelationships(),
getConnectionString(), addressMap);
+ return Objects.hash(super.hashCode(), properties, getRelationships(),
getConnectionString());
}
public static class Plc4xConnectionStringValidator implements Validator {
@@ -179,4 +195,8 @@ public abstract class BasePlc4xProcessor extends
AbstractProcessor {
return connectionManager;
}
+ protected PlcDriver getDriver() throws PlcConnectionException {
+ return PlcDriverManager.getDefault().getDriverForUrl(connectionString);
+ }
+
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
index 970f693cea..a13e6c3dff 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
@@ -18,6 +18,8 @@
*/
package org.apache.plc4x.nifi;
+import java.util.Map;
+
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
@@ -31,6 +33,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.model.PlcTag;
@TriggerSerially
@Tags({"plc4x-sink"})
@@ -56,13 +59,20 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
// Prepare the request.
PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
- flowFile.getAttributes().forEach((tag, value) -> {
- String address = getAddress(tag);
- if (address != null) {
- // TODO: Convert the String into the right type ...
- builder.addTagAddress(tag, address,
Boolean.valueOf(value));
+ Map<String,String> addressMap = getPlcAddressMap(context,
flowFile);
+ final Map<String, PlcTag> tags =
getSchemaCache().retrieveTags(addressMap);
+
+ if (tags != null){
+ for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
+ builder.addTag(tag.getKey(), tag.getValue());
+ }
+ } else {
+ getLogger().debug("PlcTypes resolution not found in cache and
will be added with key: " + addressMap.toString());
+ for (Map.Entry<String,String> entry: addressMap.entrySet()){
+ builder.addTagAddress(entry.getKey(), entry.getValue());
}
- });
+ }
+
PlcWriteRequest writeRequest = builder.build();
// Send the request to the PLC.
@@ -70,6 +80,16 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
final PlcWriteResponse plcWriteResponse =
writeRequest.execute().get();
// TODO: Evaluate the response and create flow files for
successful and unsuccessful updates
session.transfer(flowFile, REL_SUCCESS);
+
+ if (tags == null){
+ getLogger().debug("Adding PlcTypes resolution into cache
with key: " + addressMap.toString());
+ getSchemaCache().addSchema(
+ addressMap,
+ writeRequest.getTagNames(),
+ writeRequest.getTags(),
+ null
+ );
+ }
} catch (Exception e) {
flowFile = session.putAttribute(flowFile, "exception",
e.getLocalizedMessage());
session.transfer(flowFile, REL_FAILURE);
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index b10cd671b8..1a88aa4721 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -32,8 +32,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcDriver;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.model.PlcTag;
@Tags({"plc4x-source"})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@@ -54,12 +56,20 @@ public class Plc4xSourceProcessor extends
BasePlc4xProcessor {
FlowFile flowFile = session.create();
try {
PlcReadRequest.Builder builder =
connection.readRequestBuilder();
- getTags().forEach(tag -> {
- String address = getAddress(tag);
- if (address != null) {
- builder.addTagAddress(tag, address);
+ Map<String,String> addressMap = getPlcAddressMap(context,
flowFile);
+ final Map<String, PlcTag> tags =
getSchemaCache().retrieveTags(addressMap);
+
+ if (tags != null){
+ for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
+ builder.addTag(tag.getKey(), tag.getValue());
+ }
+ } else {
+ getLogger().debug("PlcTypes resolution not found in cache
and will be added with key: " + addressMap.toString());
+ for (Map.Entry<String,String> entry:
addressMap.entrySet()){
+ builder.addTagAddress(entry.getKey(),
entry.getValue());
}
- });
+ }
+
PlcReadRequest readRequest = builder.build();
PlcReadResponse response = readRequest.execute().get();
Map<String, String> attributes = new HashMap<>();
@@ -69,7 +79,18 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor
{
attributes.put(tagName, String.valueOf(value));
}
}
- flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ if (tags == null){
+ getLogger().debug("Adding PlcTypes resolution into cache
with key: " + addressMap.toString());
+ getSchemaCache().addSchema(
+ addressMap,
+ readRequest.getTagNames(),
+ readRequest.getTags(),
+ null
+ );
+ }
+
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ProcessException(e);
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
index 6ac08491ba..1af45e6bae 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
@@ -46,12 +46,13 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StopWatch;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.model.PlcTag;
import org.apache.plc4x.nifi.record.Plc4xWriter;
import org.apache.plc4x.nifi.record.RecordPlc4xWriter;
@@ -103,21 +104,15 @@ public class Plc4xSourceRecordProcessor extends
BasePlc4xProcessor {
@OnScheduled
@Override
public void onScheduled(final ProcessContext context) {
+ super.onScheduled(context);
super.connectionString =
context.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
this.readTimeout =
context.getProperty(PLC_READ_FUTURE_TIMEOUT_MILISECONDS.getName()).asInteger();
- addressMap = new HashMap<>();
- //variables are passed as dynamic properties
-
context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic).forEach(
- t -> addressMap.put(t.getName(),
context.getProperty(t.getName()).getValue()));
- if (addressMap.isEmpty()) {
- throw new PlcRuntimeException("No address specified");
- }
}
@Override
public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
FlowFile fileToProcess = null;
- // TODO: In the future the processor will be configurable to
get the address and the connection from incoming flowfile
+ // TODO: In the future the processor will be configurable to
get the connection from incoming flowfile
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
// If we have no FlowFile, and all incoming connections
are self-loops then we
@@ -151,12 +146,21 @@ public class Plc4xSourceRecordProcessor extends
BasePlc4xProcessor {
}
PlcReadRequest.Builder builder =
connection.readRequestBuilder();
- getTags().forEach(tagName -> {
- String address = getAddress(tagName);
- if (address != null) {
- builder.addTagAddress(tagName, address);
+ Map<String,String> addressMap =
getPlcAddressMap(context, fileToProcess);
+ final RecordSchema recordSchema =
getSchemaCache().retrieveSchema(addressMap);
+ final Map<String, PlcTag> tags =
getSchemaCache().retrieveTags(addressMap);
+
+ if (tags != null){
+ for (Map.Entry<String,PlcTag> tag :
tags.entrySet()){
+ builder.addTag(tag.getKey(),
tag.getValue());
}
- });
+ } else {
+ logger.debug("Plc-Avro schema and PlcTypes
resolution not found in cache and will be added with key: " +
addressMap.toString());
+ for (Map.Entry<String,String> entry:
addressMap.entrySet()){
+ builder.addTagAddress(entry.getKey(),
entry.getValue());
+ }
+ }
+
PlcReadRequest readRequest = builder.build();
final FlowFile originalFlowFile = fileToProcess;
resultSetFF = session.write(resultSetFF, out -> {
@@ -164,9 +168,9 @@ public class Plc4xSourceRecordProcessor extends
BasePlc4xProcessor {
PlcReadResponse readResponse =
readRequest.execute().get(this.readTimeout, TimeUnit.MILLISECONDS);
if(originalFlowFile == null) //there is
no inherit attributes to use in writer service
-
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null));
+
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null,
recordSchema));
else
-
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null,
originalFlowFile));
+
nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null,
recordSchema, originalFlowFile));
} catch (InterruptedException e) {
logger.error("InterruptedException
reading the data from PLC", e);
Thread.currentThread().interrupt();
@@ -179,6 +183,16 @@ public class Plc4xSourceRecordProcessor extends
BasePlc4xProcessor {
throw (e instanceof ProcessException) ?
(ProcessException) e : new ProcessException(e);
}
});
+
+ if (recordSchema == null){
+ logger.debug("Adding Plc-Avro schema and
PlcTypes resolution into cache with key: " + addressMap.toString());
+ getSchemaCache().addSchema(
+ addressMap,
+ readRequest.getTagNames(),
+ readRequest.getTags(),
+ plc4xWriter.getRecordSchema()
+ );
+ }
long executionTimeElapsed =
executeTime.getElapsed(TimeUnit.MILLISECONDS);
final Map<String, String> attributesToAdd = new
HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT,
String.valueOf(nrOfRows.get()));
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
new file mode 100644
index 0000000000..31b4f429ff
--- /dev/null
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.nifi.address;
+
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+
+public interface AddressesAccessStrategy {
+ public Map<String,String> extractAddresses(final ProcessContext context,
final FlowFile flowFile);
+}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
new file mode 100644
index 0000000000..b24af06e21
--- /dev/null
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.JsonValidator;
+
+public class AddressesAccessUtils {
+ public static final AllowableValue ADDRESS_PROPERTY = new AllowableValue(
+ "property-address",
+ "Use Properties as Addresses",
+ "Each property will be treated as tag-address pairs after
Expression Language is evaluated.");
+
+ public static final AllowableValue ADDRESS_TEXT = new AllowableValue(
+ "text-address",
+ "Use 'Address Text' Property",
+ "Addresses will be obtained from 'Address Text' Property. It's
content must be a valid JSON " +
+ "after Expression Language is evaluated. ");
+
+ public static final PropertyDescriptor PLC_ADDRESS_ACCESS_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("plc4x-address-access-strategy")
+ .displayName("Address Access Strategy")
+ .description("Strategy used to obtain the PLC addresses")
+ .allowableValues(ADDRESS_PROPERTY, ADDRESS_TEXT)
+ .defaultValue(ADDRESS_PROPERTY.getValue())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor ADDRESS_TEXT_PROPERTY = new
PropertyDescriptor.Builder()
+ .name("text-address-property")
+ .displayName("Address Text")
+ .description("Must contain a valid JSON object after Expression
Language is evaluated. "
+ + "Each field-value is treated as tag-address.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(new JsonValidator())
+ .dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_TEXT)
+ .required(true)
+ .build();
+
+ public static AddressesAccessStrategy getAccessStrategy(final
ProcessContext context) {
+ String value =
context.getProperty(PLC_ADDRESS_ACCESS_STRATEGY).getValue();
+ if (ADDRESS_PROPERTY.getValue().equalsIgnoreCase(value))
+ return new DynamicPropertyAccessStrategy();
+ else if (ADDRESS_TEXT.getValue().equalsIgnoreCase(value))
+ return new TextPropertyAccessStrategy();
+ return null;
+ }
+}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
new file mode 100644
index 0000000000..2fe6367ccf
--- /dev/null
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+
+public class DynamicPropertyAccessStrategy implements AddressesAccessStrategy{
+
+ private Map<String,String> extractAddressesFromAttributes(final
ProcessContext context, final FlowFile flowFile) {
+ Map<String,String> addressMap = new HashMap<>();
+
+
context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic).forEach(
+ t -> addressMap.put(t.getName(),
context.getProperty(t.getName()).evaluateAttributeExpressions(flowFile).getValue()));
+
+ return addressMap;
+ }
+
+ public Map<String, String> extractAddresses(final ProcessContext context,
final FlowFile flowFile) {
+ return extractAddressesFromAttributes(context, flowFile);
+ }
+}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
new file mode 100644
index 0000000000..f172eada6c
--- /dev/null
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class TextPropertyAccessStrategy implements AddressesAccessStrategy{
+ private Map<String,String> extractAddressesFromText(String input) throws
JsonMappingException, JsonProcessingException {
+ ObjectMapper mapper = new ObjectMapper();
+
+ return mapper.readValue(input, Map.class);
+ }
+
+ @Override
+ public Map<String, String> extractAddresses(final ProcessContext context,
final FlowFile flowFile) throws ProcessException{
+ try {
+ return
extractAddressesFromText(context.getProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY).evaluateAttributeExpressions(flowFile).getValue());
+ } catch (Exception e) {
+ throw new ProcessException(e.toString());
+ }
+
+ }
+}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
index 975d48e314..3e038c0c50 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
@@ -45,11 +45,9 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
private final Set<String> rsColumnNames;
private boolean moreRows;
- // TODO: review this AtomicReference?
- // TODO: this could be enhanced checking if record schema should be
updated (via a cache boolean, checking property values is a nifi expression
language, etc)
- private AtomicReference<RecordSchema> recordSchema;
+ private AtomicReference<RecordSchema> recordSchema = new
AtomicReference<RecordSchema>(null);
- public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse)
throws IOException {
+ public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse,
RecordSchema recordSchema) throws IOException {
this.readResponse = readResponse;
moreRows = true;
@@ -58,9 +56,10 @@ public class Plc4xReadResponseRecordSet implements
RecordSet, Closeable {
rsColumnNames = responseDataStructure.keySet();
if (recordSchema == null) {
- Schema avroSchema =
Plc4xCommon.createSchema(responseDataStructure); //TODO: review this method as
it is the 'mapping' from PlcValues to avro datatypes
- recordSchema = new AtomicReference<RecordSchema>();
- recordSchema.set(AvroTypeUtil.createSchema(avroSchema));
+ Schema avroSchema =
Plc4xCommon.createSchema(responseDataStructure);
+ this.recordSchema.set(AvroTypeUtil.createSchema(avroSchema));
+ } else {
+ this.recordSchema.set(recordSchema);
}
logger.debug("Record schema from PlcReadResponse successfuly
created.");
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
index bef85f4205..a950ac3735 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
public interface Plc4xWriter {
@@ -38,8 +39,8 @@ public interface Plc4xWriter {
* @return the number of rows written to the output stream
* @throws Exception if any errors occur during the writing of the result
set to the output stream
*/
- long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback)
throws Exception;
- long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
FlowFile originalFlowFile) throws Exception;
+ long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
RecordSchema recordSchema) throws Exception;
+ long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
RecordSchema recordSchema, FlowFile originalFlowFile) throws Exception;
/**
* Returns a map of attribute key/value pairs to be added to any outgoing
flow file(s). The default implementation is to return an empty map.
@@ -72,4 +73,11 @@ public interface Plc4xWriter {
* @return the MIME type string of the output format.
*/
String getMimeType();
+
+
+ /**
+ * Returns the Record Schema used to create the output flowfiles. Used to
store in schema cache.
+ * @return the Record Schema of the output.
+ */
+ RecordSchema getRecordSchema();
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
index 06659efcf0..f2535604c1 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
@@ -54,9 +54,9 @@ public class RecordPlc4xWriter implements Plc4xWriter {
}
@Override
- public long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback)
throws Exception {
+ public long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
RecordSchema recordSchema) throws Exception {
if (fullRecordSet == null) {
- fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, callback);
+ fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema);
writeSchema = recordSetWriterFactory.getSchema(originalAttributes,
fullRecordSet.getSchema());
}
Map<String, String> empty = new HashMap<>();
@@ -72,9 +72,9 @@ public class RecordPlc4xWriter implements Plc4xWriter {
}
@Override
- public long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
FlowFile originalFlowFile) throws Exception {
- if (fullRecordSet == null) {
- fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, callback);
+ public long writePlcReadResponse(PlcReadResponse response, OutputStream
outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback,
RecordSchema recordSchema, FlowFile originalFlowFile) throws Exception {
+ if (fullRecordSet == null) {
+ fullRecordSet = new
Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema);
writeSchema = recordSetWriterFactory.getSchema(originalAttributes,
fullRecordSet.getSchema());
}
try (final RecordSetWriter resultSetWriter =
recordSetWriterFactory.createWriter(logger, writeSchema, outputStream,
originalFlowFile)) {
@@ -142,8 +142,8 @@ public class RecordPlc4xWriter implements Plc4xWriter {
private static class Plc4xReadResponseRecordSetWithCallback extends
Plc4xReadResponseRecordSet {
private final Plc4xReadResponseRowCallback callback;
- public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse
readResponse, Plc4xReadResponseRowCallback callback) throws IOException {
- super(readResponse);
+ public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse
readResponse, Plc4xReadResponseRowCallback callback, RecordSchema recordSchema)
throws IOException {
+ super(readResponse, recordSchema);
this.callback = callback;
}
@Override
@@ -162,4 +162,12 @@ public class RecordPlc4xWriter implements Plc4xWriter {
}
}
+ public RecordSchema getRecordSchema(){
+ try {
+ return this.fullRecordSet.getSchema();
+ } catch (IOException e){
+ return null;
+ }
+ }
+
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
new file mode 100644
index 0000000000..f925aceea0
--- /dev/null
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.nifi.record;
+
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.plc4x.java.api.model.PlcTag;
+
+public class SchemaCache {
+ private ConcurrentMap<String, SchemaContainer> schemaMap = new
ConcurrentHashMap<>();
+ private AtomicReferenceArray<String> schemaAppendOrder = new
AtomicReferenceArray<>(0);
+ private final AtomicInteger nextSchemaPosition = new AtomicInteger(0);
+ private final AtomicInteger cacheSize = new AtomicInteger(0);
+
+ /** Creates a schema cache with first-in-first-out replacement policy.
Stores PlcTags and RecordSchema used for PlcResponse serialization
+ * @param cacheSize initial cache size
+ */
+ public SchemaCache(int cacheSize) {
+ this.cacheSize.set(cacheSize);
+ }
+
+ /** Empties and restart the cache with the given size
+ * @param cacheSize size of schema cache
+ */
+ public void restartCache(int cacheSize) {
+ this.cacheSize.set(cacheSize);
+ this.schemaAppendOrder = new AtomicReferenceArray<>(cacheSize);
+ this.schemaMap = new ConcurrentHashMap<>();
+ this.nextSchemaPosition.set(0);
+ }
+
+
+ /** Adds the schema to the cache if not present. When the cache is full
first-in-first-out replacement policy applies
+ * @param schemaIdentifier tagName-address map used to store the schema
+ * @param tagsNames list of tag names
+ * @param tagsList list of PlcTag's
+ * @param schema record schema used for PlcResponse serialization. Can be
null
+ */
+ public void addSchema(final Map<String,String> schemaIdentifier, final
LinkedHashSet<String> tagsNames, final List<PlcTag> tagsList, final
RecordSchema schema) {
+ if (!schemaMap.containsKey(schemaIdentifier.toString())){
+ if (nextSchemaPosition.get() == cacheSize.get()){
+ nextSchemaPosition.set(0);
+ }
+ removeSchema(schemaAppendOrder.get(nextSchemaPosition.get()));
+
+ Map<String, PlcTag> tags = new HashMap<>();
+ for (int i=0; i<tagsNames.size(); i++){
+ tags.put(tagsNames.toArray(new String[]{})[i],
tagsList.get(i));
+ }
+ schemaMap.put(schemaIdentifier.toString(), new
SchemaContainer(tags, schema));
+ schemaAppendOrder.set(nextSchemaPosition.get(),
schemaIdentifier.toString());
+ nextSchemaPosition.getAndAdd(1);
+ }
+ }
+
+ /** Removes the schema from the cache
+ * @param schemaIdentifier tagName-address map used to store the schema
+ */
+ public void removeSchema(final String schemaIdentifier) {
+ if (schemaIdentifier == null)
+ return;
+ if (schemaMap.containsKey(schemaIdentifier)){
+ schemaMap.remove(schemaIdentifier);
+ }
+ }
+
+
+ /** Retrieves a schema from the cache if found
+ * @param schemaIdentifier tagName-address map used to store the schema
+ * @return RecordSchema used for PlcResponse serialization. Null if not
found
+ */
+ public RecordSchema retrieveSchema(final Map<String,String>
schemaIdentifier) {
+ if (schemaMap.containsKey(schemaIdentifier.toString())){
+ return schemaMap.get(schemaIdentifier.toString()).getSchema();
+ }
+ return null;
+ }
+
+ /** Retrieves tags from the cache if found
+ * @param schemaIdentifier tagName-address map used to store the schema
+ * @return Map between tag names and the corresponding PlcTag. Null if not
found
+ */
+ public Map<String, PlcTag> retrieveTags(final Map<String,String>
schemaIdentifier) {
+ if (schemaMap.containsKey(schemaIdentifier.toString())){
+ return schemaMap.get(schemaIdentifier.toString()).getTags();
+ }
+ return null;
+ }
+
+ protected int getNextSchemaPosition() {
+ return nextSchemaPosition.get();
+ }
+
+ protected int getCacheSize() {
+ return cacheSize.get();
+ }
+
+ public class SchemaContainer {
+ private RecordSchema schema;
+ private Map<String, PlcTag> tags;
+
+ public Map<String, PlcTag> getTags() {
+ return tags;
+ }
+
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ SchemaContainer(Map<String, PlcTag> tags, RecordSchema schema){
+ this.tags = tags;
+ this.schema = schema;
+ }
+ }
+}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
index f713f49d6e..bf742a15cf 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
@@ -20,22 +20,49 @@ package org.apache.plc4x.nifi;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.apache.plc4x.nifi.Plc4xSourceProcessor;
+import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.util.Plc4xCommonTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class Plc4xSourceProcessorTest {
private TestRunner testRunner;
+ private static int NUMBER_OF_CALLS = 5;
@BeforeEach
public void init() {
testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+ testRunner.setIncomingConnection(false);
+ testRunner.setValidateExpressionUsage(false);
+
+ testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING,
"simulated://127.0.0.1");
+
+ testRunner.addConnection(Plc4xSourceProcessor.REL_SUCCESS);
+ testRunner.addConnection(Plc4xSourceProcessor.REL_FAILURE);
}
- @Test
public void testProcessor() {
+ testRunner.run(NUMBER_OF_CALLS);
+ testRunner.assertTransferCount(Plc4xSourceProcessor.REL_FAILURE, 0);
+ testRunner.assertTransferCount(Plc4xSourceProcessor.REL_SUCCESS,
NUMBER_OF_CALLS);
+ }
+
+ // Test dynamic properties addressess access strategy
+ @Test
+ public void testWithAddressProperties() {
+
testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY,
AddressesAccessUtils.ADDRESS_PROPERTY);
+ Plc4xCommonTest.getAddressMap().forEach((k,v) ->
testRunner.setProperty(k, v));
+ testProcessor();
+ }
+
+ // Test addressess text property access strategy
+ @Test
+ public void testWithAddressText() {
+
testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY,
AddressesAccessUtils.ADDRESS_TEXT);
+ testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY,
Plc4xCommonTest.getAddressMap().toString());
+ testProcessor();
}
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
index e508318571..434d22a472 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
@@ -18,12 +18,11 @@
*/
package org.apache.plc4x.nifi;
-import java.util.Map;
-
import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.apache.plc4x.nifi.address.AddressesAccessUtils;
import org.apache.plc4x.nifi.util.Plc4xCommonTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -43,6 +42,7 @@ public class Plc4xSourceRecordProcessorTest {
testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_READ_FUTURE_TIMEOUT_MILISECONDS,
"100");
testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_CONNECTION_STRING,
"simulated://127.0.0.1");
+
testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_SCHEMA_CACHE_SIZE, "1");
testRunner.addConnection(Plc4xSourceRecordProcessor.REL_SUCCESS);
testRunner.addConnection(Plc4xSourceRecordProcessor.REL_FAILURE);
@@ -50,22 +50,9 @@ public class Plc4xSourceRecordProcessorTest {
testRunner.addControllerService("writer", writerService);
testRunner.enableControllerService(writerService);
testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_RECORD_WRITER_FACTORY.getName(),
"writer");
-
- for (Map.Entry<String,String> address
:Plc4xCommonTest.addressMap.entrySet()) {
- // TODO: Random generation not working with this types
- if (address.getValue().startsWith("RANDOM/")) {
- if (address.getValue().endsWith("BYTE") ||
- address.getValue().endsWith("CHAR") ||
- address.getValue().endsWith("STRING"))
- continue;
- }
- testRunner.setProperty(address.getKey(),
address.getValue());
- }
}
- @Test
public void testAvroRecordWriterProcessor() throws InitializationException
{
-
testRunner.run(NUMBER_OF_CALLS,true, true);
//validations
testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_FAILURE,
0);
@@ -73,4 +60,20 @@ public class Plc4xSourceRecordProcessorTest {
Plc4xCommonTest.assertAvroContent(testRunner.getFlowFilesForRelationship(Plc4xSourceProcessor.REL_SUCCESS),
false, true);
}
+
+ // Test dynamic properties addressess access strategy
+ @Test
+ public void testWithAddressProperties() throws InitializationException {
+
testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY,
AddressesAccessUtils.ADDRESS_PROPERTY);
+ Plc4xCommonTest.getAddressMap().forEach((k,v) ->
testRunner.setProperty(k, v));
+ testAvroRecordWriterProcessor();
+ }
+
+ // Test addressess text property access strategy
+ @Test
+ public void testWithAddressText() throws InitializationException {
+
testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY,
AddressesAccessUtils.ADDRESS_TEXT);
+ testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY,
Plc4xCommonTest.getAddressMap().toString());
+ testAvroRecordWriterProcessor();
+ }
}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/record/SchemaCacheTest.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/record/SchemaCacheTest.java
new file mode 100644
index 0000000000..e1d641cefc
--- /dev/null
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/record/SchemaCacheTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.record;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.SchemaBuilder.FieldAssembler;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.plc4x.java.api.model.PlcTag;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SchemaCacheTest {
+
+ private static SchemaCache schemaCache = new SchemaCache(0);
+ private static List<RecordSchema> schemas = new ArrayList<>();
+ private static List<List<PlcTag>> tags = new ArrayList<>();
+ private static List<Map<String, String>> addresses = new ArrayList<>();
+ private static List<LinkedHashSet<String>> tagNames = new ArrayList<>();
+
+ @BeforeAll
+ static public void innit() {
+ // Create 5 unique Schemas
+
+ for (int i = 0; i < 5; i++) {
+ FieldAssembler<Schema> fieldAssembler =
SchemaBuilder.builder().record("foo").fields();
+ Collection<String> collection = new HashSet<>();
+ List<PlcTag> innerTags = new ArrayList<>();
+ String address = null;
+
+ switch (i) {
+ case 0:
+ address = "RANDOM/fooBOOL:BOOL";
+ addresses.add(Map.of("BOOL", address));
+ innerTags.add(new TestPlcTag(address));
+ tags.add(innerTags);
+ fieldAssembler.nullableBoolean("BOOL", false);
+ collection.add("fooBOOL");
+ tagNames.add(new LinkedHashSet<>(collection));
+ break;
+ case 1:
+ address = "RANDOM/fooINT:INT";
+ addresses.add(Map.of("INT", address));
+ innerTags.add(new TestPlcTag(address));
+ tags.add(innerTags);
+ fieldAssembler.nullableInt("INT", 1);
+ collection.add("fooINT");
+ tagNames.add(new LinkedHashSet<>(collection));
+ break;
+ case 2:
+ address = "RANDOM/fooLONG:LONG";
+ addresses.add(Map.of("LONG", address));
+ innerTags.add(new TestPlcTag(address));
+ tags.add(innerTags);
+ fieldAssembler.nullableLong("LONG", i * 100);
+ collection.add("fooLONG");
+ tagNames.add(new LinkedHashSet<>(collection));
+ break;
+ case 3:
+ address = "RANDOM/fooFLOAT:FLOAT";
+ addresses.add(Map.of("FLOAT", address));
+ innerTags.add(new TestPlcTag(address));
+ tags.add(innerTags);
+ fieldAssembler.nullableFloat("FLOAT", i * 0.1F);
+ collection.add("fooFLOAT");
+ tagNames.add(new LinkedHashSet<>(collection));
+ break;
+ case 4:
+ address = "RANDOM/fooDOUBLE:DOUBLE";
+ addresses.add(Map.of("DOUBLE", address));
+ innerTags.add(new TestPlcTag(address));
+ tags.add(innerTags);
+ fieldAssembler.nullableDouble("DOUBLE", i * 0.01);
+ collection.add("fooDOUBLE");
+ tagNames.add(new LinkedHashSet<>(collection));
+ break;
+ }
+ Schema avroSchema = fieldAssembler.endRecord();
+ schemas.add(AvroTypeUtil.createSchema(avroSchema));
+ }
+ }
+
+ // Cache size set to 4 < number of schemas: to check schema override.
+ @BeforeEach
+ public void testCacheSize() {
+ schemaCache.restartCache(4);
+ assert schemaCache.getCacheSize() == 4;
+ assert schemaCache.getNextSchemaPosition() == 0;
+ }
+
+ // In this test we add 4 schemas and try to add schema 0 again. It should
not be added.
+ @Test
+ public void testAddSchema() {
+ for (int i = 0; i < 4; i++) {
+ schemaCache.addSchema(addresses.get(i), tagNames.get(i),
tags.get(i), schemas.get(i));
+ assert schemaCache.getNextSchemaPosition() == i + 1;
+ }
+ int prev = schemaCache.getNextSchemaPosition();
+ schemaCache.addSchema(addresses.get(0), tagNames.get(0), tags.get(0),
schemas.get(0));
+ assert prev == schemaCache.getNextSchemaPosition();
+ }
+
+ // In this test check schema overriding
+ @Test
+ public void testSchemaOverride() {
+ for (int i = 0; i < 4; i++) {
+ schemaCache.addSchema(addresses.get(i), tagNames.get(i),
tags.get(i), schemas.get(i));
+ assert schemaCache.getNextSchemaPosition() == i + 1;
+ }
+ // Override first schema
+ schemaCache.addSchema(addresses.get(4), tagNames.get(4), tags.get(4),
schemas.get(4));
+ assert schemaCache.getNextSchemaPosition() == 1;
+
+ // First schema should not be present in the cache
+ assert schemaCache.retrieveSchema(addresses.get(0)) == null;
+
+ // Check remaining schemas
+ for (int i=1; i<5; i++){
+ assert schemaCache.retrieveSchema(addresses.get(i)) ==
schemas.get(i);
+ }
+ }
+
+
+ public static void main(String[] args) {
+ SchemaCacheTest.innit();
+
+ SchemaCacheTest instance = new SchemaCacheTest();
+
+ instance.testCacheSize();
+ instance.testAddSchema();
+
+ instance.testCacheSize();
+ instance.testSchemaOverride();
+ }
+
+
+ private static class TestPlcTag implements PlcTag {
+
+ String address;
+
+ TestPlcTag(String address) {
+ this.address = address;
+ }
+
+ @Override
+ public String getAddressString() {
+ return address;
+ }
+ }
+}
diff --git
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java
index 425bd0b30e..eb532ea268 100644
---
a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java
+++
b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java
@@ -97,6 +97,20 @@ public class Plc4xCommonTest {
typeMap.put("STRING", Utf8.class);
}
+ public static Map<String, String> getAddressMap(){
+ Map<String, String> result = new HashMap<>();
+
+ addressMap.forEach((k,v) -> {
+ if (!v.startsWith("RANDOM/")) {
+ if (!v.endsWith("BYTE") &&
+ !v.endsWith("CHAR") &&
+ !v.endsWith("STRING"))
+ result.put(k, v);
+ }
+ });
+ return result;
+ }
+
public static void assertAvroContent(List<MockFlowFile> flowfiles, boolean
checkValue, boolean checkType) {
flowfiles.forEach(new Consumer<MockFlowFile>() {
@Override
diff --git a/plc4j/integrations/apache-nifi/pom.xml
b/plc4j/integrations/apache-nifi/pom.xml
index 2f4fd33e0f..65b2f74a11 100644
--- a/plc4j/integrations/apache-nifi/pom.xml
+++ b/plc4j/integrations/apache-nifi/pom.xml
@@ -111,6 +111,11 @@
<artifactId>nifi-avro-record-utils</artifactId>
<version>${nifi.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-json-utils</artifactId>
+ <version>${nifi.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>