This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3622-update-processors-in-streampipes-processors-transformation-jvm
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3622-update-processors-in-streampipes-processors-transformation-jvm
by this push:
new 554561dead refactor(#3622): Update processors to use
IStreamPipesDataProcessor
554561dead is described below
commit 554561deadc8f0e4ce35ab853fb1f8703cf9a310
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri May 23 09:27:18 2025 +0200
refactor(#3622): Update processors to use IStreamPipesDataProcessor
---
.../processor/array/count/CountArrayProcessor.java | 54 ++++----
.../processor/array/split/SplitArrayProcessor.java | 109 +++++++++--------
.../edge/SignalEdgeFilterProcessor.java | 63 +++++-----
.../inverter/BooleanInverterProcessor.java | 57 ++++-----
.../logical/BooleanOperatorProcessor.java | 79 ++++++------
.../state/BooleanToStateProcessor.java | 62 +++++-----
.../timekeeping/BooleanTimekeepingProcessor.java | 88 ++++++-------
.../booloperator/timer/BooleanTimerProcessor.java | 72 +++++------
.../CsvMetadataEnrichmentProcessor.java | 136 +++++++++++++--------
.../datetime/DateTimeFromStringProcessor.java | 78 ++++++------
.../MeasurementUnitConverterProcessor.java | 62 +++++-----
.../jvm/processor/round/RoundProcessor.java | 55 +++++----
.../jvm/processor/state/StateUtils.java | 12 +-
.../state/buffer/StateBufferProcessor.java | 88 ++++++-------
.../buffer/StateBufferLabelerProcessor.java | 102 +++++++---------
.../jvm/processor/task/TaskDurationProcessor.java | 65 +++++-----
16 files changed, 600 insertions(+), 582 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
index 0e72d26823..6008c99117 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
@@ -18,29 +18,29 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.array.count;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.AbstractField;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.util.List;
-public class CountArrayProcessor extends StreamPipesDataProcessor {
+public class CountArrayProcessor implements IStreamPipesDataProcessor {
public static final String COUNT_NAME = "countValue";
public static final String ARRAY_FIELD = "array-field";
@@ -48,41 +48,39 @@ public class CountArrayProcessor extends
StreamPipesDataProcessor {
private String arrayField;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.count-array", 0)
- .category(DataProcessorType.COUNT_OPERATOR)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(
- StreamRequirementsBuilder.create()
-
.requiredPropertyWithUnaryMapping(EpRequirements.listRequirement(),
- Labels.withId(ARRAY_FIELD), PropertyScope.NONE)
- .build())
-
.outputStrategy(OutputStrategies.append(EpProperties.doubleEp(Labels.empty(),
COUNT_NAME,
- SO.NUMBER)))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ CountArrayProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.count-array", 0)
+ .category(DataProcessorType.COUNT_OPERATOR)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(
+ StreamRequirementsBuilder.create()
+
.requiredPropertyWithUnaryMapping(EpRequirements.listRequirement(),
+ Labels.withId(ARRAY_FIELD), PropertyScope.NONE)
+ .build())
+
.outputStrategy(OutputStrategies.append(EpProperties.doubleEp(Labels.empty(),
COUNT_NAME, SO.NUMBER)))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
this.arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD);
}
@Override
- public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
-
+ public void onEvent(Event event, SpOutputCollector collector) {
List<AbstractField> allEvents =
event.getFieldBySelector(arrayField).getAsList().getRawValue();
-
event.addField(CountArrayProcessor.COUNT_NAME, allEvents.size());
-
collector.collect(event);
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayProcessor.java
index 742c0243d8..ac924933df 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayProcessor.java
@@ -18,13 +18,14 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.array.split;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.AbstractField;
@@ -36,21 +37,20 @@ import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public class SplitArrayProcessor extends StreamPipesDataProcessor
- implements
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation,
ProcessingElementParameterExtractor> {
-
+public class SplitArrayProcessor
+ implements IStreamPipesDataProcessor,
+
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation,
ProcessingElementParameterExtractor> {
public static final String KEEP_PROPERTIES_ID = "keep";
public static final String ARRAY_FIELD_ID = "array-field";
public static final String VALUE = "array_value";
@@ -59,28 +59,36 @@ public class SplitArrayProcessor extends
StreamPipesDataProcessor
private List<String> keepProperties;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.split-array", 0)
- .category(DataProcessorType.TRANSFORM)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithNaryMapping(EpRequirements.anyProperty(),
- Labels.withId(KEEP_PROPERTIES_ID),
- PropertyScope.NONE)
- .requiredPropertyWithUnaryMapping(EpRequirements.listRequirement(),
- Labels.withId(ARRAY_FIELD_ID),
- PropertyScope.NONE)
- .build())
- .outputStrategy(OutputStrategies.customTransformation())
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ SplitArrayProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.split-array", 0)
+ .category(DataProcessorType.TRANSFORM)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+
.requiredPropertyWithNaryMapping(
+
EpRequirements.anyProperty(),
+
Labels.withId(KEEP_PROPERTIES_ID),
+ PropertyScope.NONE
+ )
+
.requiredPropertyWithUnaryMapping(
+
EpRequirements.listRequirement(),
+
Labels.withId(ARRAY_FIELD_ID),
+ PropertyScope.NONE
+ )
+ .build())
+ .outputStrategy(OutputStrategies.customTransformation())
+ .build()
+ );
}
@Override
- public EventSchema resolveOutputStrategy(DataProcessorInvocation
processingElement,
- ProcessingElementParameterExtractor
extractor)
- throws SpRuntimeException {
+ public EventSchema resolveOutputStrategy(
+ DataProcessorInvocation processingElement,
+ ProcessingElementParameterExtractor extractor
+ ) {
String arrayFieldSelector = extractor.mappingPropertyValue(ARRAY_FIELD_ID);
List<String> keepPropertySelectors =
extractor.mappingPropertyValues(KEEP_PROPERTIES_ID);
@@ -91,8 +99,7 @@ public class SplitArrayProcessor extends
StreamPipesDataProcessor
newProperty.setLabel("Array Value");
newProperty.setDescription("Contains values of the array. Created by Split
Array processor.");
- List<EventProperty> keepProperties = extractor.getEventPropertiesBySelector
- (keepPropertySelectors);
+ List<EventProperty> keepProperties =
extractor.getEventPropertiesBySelector(keepPropertySelectors);
outProperties.add(newProperty);
outProperties.addAll(keepProperties);
@@ -100,32 +107,36 @@ public class SplitArrayProcessor extends
StreamPipesDataProcessor
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD_ID);
- keepProperties =
parameters.extractor().mappingPropertyValues(KEEP_PROPERTIES_ID);
+ public void onPipelineStarted(
+ IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) {
+ arrayField = parameters.extractor()
+ .mappingPropertyValue(ARRAY_FIELD_ID);
+ keepProperties = parameters.extractor()
+ .mappingPropertyValues(KEEP_PROPERTIES_ID);
}
@Override
- public void onEvent(Event event,
- SpOutputCollector collector) throws SpRuntimeException {
-
- List<AbstractField> allEvents =
event.getFieldBySelector(arrayField).getAsList()
- .parseAsCustomType(o -> {
- if (o instanceof NestedField) {
- return o;
- } else if (o instanceof ListField) {
- return o;
- } else {
- return o;
- }
- });
+ public void onEvent(Event event, SpOutputCollector collector) {
+ List<AbstractField> allEvents = event.getFieldBySelector(arrayField)
+ .getAsList()
+ .parseAsCustomType(o -> {
+ if (o instanceof NestedField) {
+ return o;
+ } else if (o instanceof ListField) {
+ return o;
+ } else {
+ return o;
+ }
+ });
for (AbstractField field : allEvents) {
Event outEvent = new Event();
if (field instanceof NestedField) {
- for (Map.Entry<String, AbstractField> key : ((NestedField)
field).getRawValue().entrySet()) {
+ for (Map.Entry<String, AbstractField> key : ((NestedField)
field).getRawValue()
+
.entrySet()) {
outEvent.addField(key.getValue());
}
} else {
@@ -141,7 +152,7 @@ public class SplitArrayProcessor extends
StreamPipesDataProcessor
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
+
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
index efd28c53d4..48b1e9f8d8 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
@@ -18,28 +18,28 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.util.ArrayList;
import java.util.List;
-public class SignalEdgeFilterProcessor extends StreamPipesDataProcessor {
+public class SignalEdgeFilterProcessor implements IStreamPipesDataProcessor {
public static final String BOOLEAN_SIGNAL_FIELD = "boolean_signal_field";
public static final String FLANK_ID = "flank";
@@ -64,33 +64,35 @@ public class SignalEdgeFilterProcessor extends
StreamPipesDataProcessor {
private boolean edgeDetected;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge",
0)
- .category(DataProcessorType.BOOLEAN_OPERATOR, DataProcessorType.FILTER)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(EpRequirements.booleanReq(),
Labels.withId(BOOLEAN_SIGNAL_FIELD),
- PropertyScope.NONE)
- .build())
- .requiredSingleValueSelection(Labels.withId(FLANK_ID),
Options.from(BOTH, FLANK_UP, FLANK_DOWN))
- .requiredIntegerParameter(Labels.withId(DELAY_ID), 0)
- .requiredSingleValueSelection(Labels.withId(EVENT_SELECTION_ID),
- Options.from(OPTION_FIRST, OPTION_LAST, OPTION_ALL))
- .outputStrategy(OutputStrategies.keep())
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ SignalEdgeFilterProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge",
0)
+ .category(DataProcessorType.BOOLEAN_OPERATOR,
DataProcessorType.FILTER)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.booleanReq(),
Labels.withId(BOOLEAN_SIGNAL_FIELD),
+ PropertyScope.NONE)
+ .build())
+ .requiredSingleValueSelection(Labels.withId(FLANK_ID),
Options.from(BOTH, FLANK_UP, FLANK_DOWN))
+ .requiredIntegerParameter(Labels.withId(DELAY_ID), 0)
+ .requiredSingleValueSelection(Labels.withId(EVENT_SELECTION_ID),
+ Options.from(OPTION_FIRST, OPTION_LAST, OPTION_ALL))
+ .outputStrategy(OutputStrategies.keep())
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
booleanSignalField =
parameters.extractor().mappingPropertyValue(BOOLEAN_SIGNAL_FIELD);
flank = parameters.extractor().selectedSingleValue(FLANK_ID, String.class);
delay = parameters.extractor().singleValueParameter(DELAY_ID,
Integer.class);
eventSelection =
parameters.extractor().selectedSingleValue(EVENT_SELECTION_ID, String.class);
-
this.lastValue = false;
this.delayCount = 0;
this.resultEvents = new ArrayList<>();
@@ -99,39 +101,32 @@ public class SignalEdgeFilterProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event inputEvent,
- SpOutputCollector collector) throws SpRuntimeException {
+ SpOutputCollector collector) {
boolean value =
inputEvent.getFieldBySelector(this.booleanSignalField).getAsPrimitive().getAsBoolean();
-
// Detect edges in signal
if (detectEdge(value, lastValue)) {
this.edgeDetected = true;
this.resultEvents = new ArrayList<>();
this.delayCount = 0;
}
-
if (edgeDetected) {
// Buffer event(s) according to user configuration
addResultEvent(inputEvent);
-
// Detect if the delay has been waited for
if (this.delay == delayCount) {
for (Event event : this.resultEvents) {
collector.collect(event);
}
-
this.edgeDetected = false;
-
} else {
this.delayCount++;
}
}
-
this.lastValue = value;
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
private boolean detectEdge(boolean value, boolean lastValue) {
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/inverter/BooleanInverterProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/inverter/BooleanInverterProcessor.java
index 6be14a9edc..fc2677b868 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/inverter/BooleanInverterProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/inverter/BooleanInverterProcessor.java
@@ -18,61 +18,62 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.inverter;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class BooleanInverterProcessor extends StreamPipesDataProcessor {
+public class BooleanInverterProcessor implements IStreamPipesDataProcessor {
public static final String INVERT_FIELD_ID = "invert-field";
private String invertFieldName;
+
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
0)
- .category(DataProcessorType.BOOLEAN_OPERATOR)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.booleanReq(),
- Labels.withId(INVERT_FIELD_ID),
- PropertyScope.NONE)
- .build())
- .outputStrategy(OutputStrategies.keep())
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ BooleanInverterProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
0)
+ .category(DataProcessorType.BOOLEAN_OPERATOR)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.booleanReq(),
+ Labels.withId(INVERT_FIELD_ID),
+ PropertyScope.NONE)
+ .build())
+ .outputStrategy(OutputStrategies.keep())
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- ProcessingElementParameterExtractor extractor = parameters.extractor();
- this.invertFieldName = extractor.mappingPropertyValue(INVERT_FIELD_ID);
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
+ this.invertFieldName =
parameters.extractor().mappingPropertyValue(INVERT_FIELD_ID);
}
@Override
- public void onEvent(Event inputEvent, SpOutputCollector collector) throws
SpRuntimeException {
+ public void onEvent(Event inputEvent, SpOutputCollector collector) {
boolean field =
inputEvent.getFieldBySelector(invertFieldName).getAsPrimitive().getAsBoolean();
inputEvent.updateFieldBySelector(invertFieldName, !field);
collector.collect(inputEvent);
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/logical/BooleanOperatorProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/logical/BooleanOperatorProcessor.java
index c367374a9e..a08ea8f201 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/logical/BooleanOperatorProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/logical/BooleanOperatorProcessor.java
@@ -19,11 +19,13 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.logical;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.logical.enums.BooleanOperatorType;
@@ -32,20 +34,19 @@ import
org.apache.streampipes.processors.transformation.jvm.processor.booloperat
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.util.List;
import static
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.logical.enums.BooleanOperatorType.NOT;
-public class BooleanOperatorProcessor extends StreamPipesDataProcessor {
+public class BooleanOperatorProcessor implements IStreamPipesDataProcessor {
private static final String BOOLEAN_PROCESSOR_OUT_KEY =
"boolean-operations-result";
private static final String BOOLEAN_OPERATOR_TYPE = "operator-field";
@@ -53,38 +54,42 @@ public class BooleanOperatorProcessor extends
StreamPipesDataProcessor {
private BooleanOperationInputConfigs configs;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.logical",
0)
- .withAssets(ExtensionAssetType.DOCUMENTATION)
- .withLocales(Locales.EN)
- .category(DataProcessorType.BOOLEAN_OPERATOR)
- .requiredStream(
- StreamRequirementsBuilder
- .create()
- .requiredPropertyWithNaryMapping(EpRequirements.booleanReq(),
- Labels.withId(PROPERTIES_LIST), PropertyScope.NONE)
- .build())
- .requiredSingleValueSelection(Labels.withId(BOOLEAN_OPERATOR_TYPE),
Options.from(
- BooleanOperatorType.AND.operator(),
- BooleanOperatorType.OR.operator(),
- BooleanOperatorType.NOT.operator(),
- BooleanOperatorType.XOR.operator(),
- BooleanOperatorType.X_NOR.operator(),
- BooleanOperatorType.NOR.operator()))
- .outputStrategy(OutputStrategies.append(
- PrimitivePropertyBuilder.create(
- Datatypes.Boolean, BOOLEAN_PROCESSOR_OUT_KEY)
- .build())
- )
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ BooleanOperatorProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.logical",
0)
+ .withAssets(ExtensionAssetType.DOCUMENTATION)
+ .withLocales(Locales.EN)
+ .category(DataProcessorType.BOOLEAN_OPERATOR)
+ .requiredStream(
+ StreamRequirementsBuilder
+ .create()
+
.requiredPropertyWithNaryMapping(EpRequirements.booleanReq(),
+ Labels.withId(PROPERTIES_LIST), PropertyScope.NONE)
+ .build())
+
.requiredSingleValueSelection(Labels.withId(BOOLEAN_OPERATOR_TYPE),
Options.from(
+ BooleanOperatorType.AND.operator(),
+ BooleanOperatorType.OR.operator(),
+ BooleanOperatorType.NOT.operator(),
+ BooleanOperatorType.XOR.operator(),
+ BooleanOperatorType.X_NOR.operator(),
+ BooleanOperatorType.NOR.operator()))
+ .outputStrategy(OutputStrategies.append(
+ PrimitivePropertyBuilder.create(
+ Datatypes.Boolean, BOOLEAN_PROCESSOR_OUT_KEY)
+ .build())
+ )
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams processorParams, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext
eventProcessorRuntimeContext) throws SpRuntimeException {
- List<String> properties =
processorParams.extractor().mappingPropertyValues(PROPERTIES_LIST);
- String operator =
processorParams.extractor().selectedSingleValue(BOOLEAN_OPERATOR_TYPE,
String.class);
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
+ List<String> properties =
parameters.extractor().mappingPropertyValues(PROPERTIES_LIST);
+ String operator =
parameters.extractor().selectedSingleValue(BOOLEAN_OPERATOR_TYPE, String.class);
BooleanOperationInputConfigs configs =
new BooleanOperationInputConfigs(properties,
BooleanOperatorType.getBooleanOperatorType(operator));
preChecks(configs);
@@ -92,7 +97,7 @@ public class BooleanOperatorProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onEvent(Event event, SpOutputCollector spOutputCollector) throws
SpRuntimeException {
+ public void onEvent(Event event, SpOutputCollector spOutputCollector) {
List<String> properties = configs.getProperties();
BooleanOperatorType operatorType = configs.getOperator();
Boolean firstProperty =
event.getFieldBySelector(properties.get(0)).getAsPrimitive().getAsBoolean();
@@ -101,24 +106,21 @@ public class BooleanOperatorProcessor extends
StreamPipesDataProcessor {
if (properties.size() == 1) {
// support for NOT operator
result = boolOperation.evaluate(firstProperty, firstProperty);
-
} else {
Boolean secondProperty =
event.getFieldBySelector(properties.get(1)).getAsPrimitive().getAsBoolean();
result = boolOperation.evaluate(firstProperty, secondProperty);
-
//loop through rest of the properties to get final result
for (int i = 2; i < properties.size(); i++) {
result =
boolOperation.evaluate(result,
event.getFieldBySelector(properties.get(i)).getAsPrimitive().getAsBoolean());
}
-
}
event.addField(BOOLEAN_PROCESSOR_OUT_KEY, result);
spOutputCollector.collect(event);
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
configs = null;
}
@@ -132,3 +134,4 @@ public class BooleanOperatorProcessor extends
StreamPipesDataProcessor {
}
}
}
+
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateProcessor.java
index 2736cc7864..81f5f5e1a8 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateProcessor.java
@@ -19,15 +19,18 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.CodeLanguage;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
@@ -36,8 +39,6 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.vocabulary.SPSensor;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
@@ -46,7 +47,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
-public class BooleanToStateProcessor extends StreamPipesDataProcessor {
+public class BooleanToStateProcessor implements IStreamPipesDataProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(BooleanToStateProcessor.class);
@@ -67,38 +68,39 @@ public class BooleanToStateProcessor extends
StreamPipesDataProcessor {
+ "}";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state",
0)
- .category(DataProcessorType.BOOLEAN_OPERATOR)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithNaryMapping(EpRequirements.booleanReq(),
Labels.withId(BOOLEAN_STATE_FIELD),
- PropertyScope.NONE)
- .build())
- .requiredTextParameter(Labels.withId(DEFAULT_STATE_ID))
- .requiredCodeblock(Labels.withId(JSON_CONFIGURATION),
CodeLanguage.Javascript, defaultSkeleton)
- .outputStrategy(OutputStrategies.append(
- EpProperties.stringEp(Labels.withId(CURRENT_STATE), CURRENT_STATE,
SPSensor.STATE)
- ))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ BooleanToStateProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state",
0)
+ .category(DataProcessorType.BOOLEAN_OPERATOR)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithNaryMapping(EpRequirements.booleanReq(),
Labels.withId(BOOLEAN_STATE_FIELD),
+ PropertyScope.NONE)
+ .build())
+ .requiredTextParameter(Labels.withId(DEFAULT_STATE_ID))
+ .requiredCodeblock(Labels.withId(JSON_CONFIGURATION),
CodeLanguage.Javascript, defaultSkeleton)
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(Labels.withId(CURRENT_STATE),
CURRENT_STATE, SPSensor.STATE)
+ ))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
stateFields = extractor.mappingPropertyValues(BOOLEAN_STATE_FIELD);
defaultState = extractor.singleValueParameter(DEFAULT_STATE_ID,
String.class);
String jsonConfigurationString =
extractor.codeblockValue(JSON_CONFIGURATION);
-
try {
jsonConfigurationString =
jsonConfigurationString.replaceAll("(?m)^//.*", "");
jsonConfiguration =
JacksonSerializer.getObjectMapper().readValue(jsonConfigurationString,
Map.class);
-
} catch (JsonProcessingException e) {
LOG.info("Error when parsing the json configuration: " +
jsonConfigurationString);
throw new SpRuntimeException("The following mapping configuration is not
valid: " + jsonConfigurationString);
@@ -107,26 +109,22 @@ public class BooleanToStateProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event,
- SpOutputCollector collector) throws SpRuntimeException {
+ SpOutputCollector collector) {
String state = this.defaultState;
-
for (String stateField : stateFields) {
if
(event.getFieldBySelector(stateField).getAsPrimitive().getAsBoolean().equals(true))
{
state = event.getFieldBySelector(stateField).getFieldNameIn();
}
}
-
- // replace the state string when user provided a mapping
if (this.jsonConfiguration.containsKey(state)) {
state = this.jsonConfiguration.get(state);
}
-
event.addField(BooleanToStateProcessor.CURRENT_STATE, state);
collector.collect(event);
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
+
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timekeeping/BooleanTimekeepingProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timekeeping/BooleanTimekeepingProcessor.java
index 71be83b2a9..97e1f26ed7 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timekeeping/BooleanTimekeepingProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timekeeping/BooleanTimekeepingProcessor.java
@@ -18,28 +18,28 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timekeeping;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.util.LinkedList;
-public class BooleanTimekeepingProcessor extends StreamPipesDataProcessor {
+public class BooleanTimekeepingProcessor implements IStreamPipesDataProcessor {
// Measures time and returns count
public static final String LEFT_FIELD_ID = "left-field";
@@ -65,42 +65,44 @@ public class BooleanTimekeepingProcessor extends
StreamPipesDataProcessor {
private double outputDivisor;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.timekeeping",
0)
- .category(DataProcessorType.BOOLEAN_OPERATOR, DataProcessorType.TIME)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON,
"time_measure_example.png")
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.booleanReq(),
- Labels.withId(LEFT_FIELD_ID),
- PropertyScope.NONE)
- .requiredPropertyWithUnaryMapping(
- EpRequirements.booleanReq(),
- Labels.withId(RIGHT_FIELD_ID),
- PropertyScope.NONE)
- .build())
- .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID),
Options.from(MILLISECONDS, SECONDS, MINUTES))
- .outputStrategy(OutputStrategies.append(
- EpProperties.numberEp(Labels.withId(TIME_FIELD_ID),
- "measured_time",
- "http://schema.org/Number"),
- EpProperties.numberEp(Labels.withId(COUNT_FIELD_ID),
- "counter",
- "http://schema.org/Number")
-
- ))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ BooleanTimekeepingProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.timekeeping",
0)
+ .category(DataProcessorType.BOOLEAN_OPERATOR,
DataProcessorType.TIME)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON, "time_measure_example.png")
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.booleanReq(),
+ Labels.withId(LEFT_FIELD_ID),
+ PropertyScope.NONE)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.booleanReq(),
+ Labels.withId(RIGHT_FIELD_ID),
+ PropertyScope.NONE)
+ .build())
+ .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID),
Options.from(MILLISECONDS, SECONDS, MINUTES))
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.numberEp(Labels.withId(TIME_FIELD_ID),
+ "measured_time",
+ "http://schema.org/Number"),
+ EpProperties.numberEp(Labels.withId(COUNT_FIELD_ID),
+ "counter",
+ "http://schema.org/Number")
+ ))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
leftFieldName = extractor.mappingPropertyValue(LEFT_FIELD_ID);
- rightFieldName = extractor.mappingPropertyValue(LEFT_FIELD_ID);
+ rightFieldName = extractor.mappingPropertyValue(RIGHT_FIELD_ID);
String outputUnit = extractor.selectedSingleValue(OUTPUT_UNIT_ID,
String.class);
leftFieldLast = false;
rightFieldLast = false;
@@ -116,40 +118,30 @@ public class BooleanTimekeepingProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onEvent(Event inputEvent, SpOutputCollector collector) throws
SpRuntimeException {
+ public void onEvent(Event inputEvent, SpOutputCollector collector) {
boolean leftField =
inputEvent.getFieldBySelector(leftFieldName).getAsPrimitive().getAsBoolean();
boolean rightField =
inputEvent.getFieldBySelector(rightFieldName).getAsPrimitive().getAsBoolean();
-
if (!rightFieldLast && rightField) {
if (this.allPending.size() > 0) {
Long startTime = this.allPending.removeLast();
-
Long timeDifference = System.currentTimeMillis() - startTime;
-
double result = timeDifference / this.outputDivisor;
-
this.counter++;
-
if (this.counter == Long.MAX_VALUE) {
this.counter = 0L;
}
-
inputEvent.addField("measured_time", result);
inputEvent.addField("counter", this.counter);
-
collector.collect(inputEvent);
}
}
-
-
if (!leftFieldLast && leftField) {
this.allPending.push(System.currentTimeMillis());
}
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timer/BooleanTimerProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timer/BooleanTimerProcessor.java
index 26eb03588f..430fe41bb8 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timer/BooleanTimerProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timer/BooleanTimerProcessor.java
@@ -18,26 +18,26 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class BooleanTimerProcessor extends StreamPipesDataProcessor {
+public class BooleanTimerProcessor implements IStreamPipesDataProcessor {
public static final String FIELD_ID = "field";
public static final String TIMER_FIELD_ID = "timerField";
@@ -58,47 +58,45 @@ public class BooleanTimerProcessor extends
StreamPipesDataProcessor {
private double outputDivisor;
-
-
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.timer",
0)
- .category(DataProcessorType.BOOLEAN_OPERATOR, DataProcessorType.TIME)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.booleanReq(),
- Labels.withId(FIELD_ID),
- PropertyScope.NONE)
- .build())
- .requiredSingleValueSelection(Labels.withId(TIMER_FIELD_ID),
Options.from(TRUE, FALSE))
- .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID),
Options.from(MILLISECONDS, SECONDS, MINUTES))
- .outputStrategy(OutputStrategies.append(
- EpProperties.numberEp(Labels.withId(MEASURED_TIME_ID),
- "measured_time",
- "http://schema.org/Number")
- ))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ BooleanTimerProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.timer",
0)
+ .category(DataProcessorType.BOOLEAN_OPERATOR,
DataProcessorType.TIME)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.booleanReq(),
+ Labels.withId(FIELD_ID),
+ PropertyScope.NONE)
+ .build())
+ .requiredSingleValueSelection(Labels.withId(TIMER_FIELD_ID),
Options.from(TRUE, FALSE))
+ .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID),
Options.from(MILLISECONDS, SECONDS, MINUTES))
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.numberEp(Labels.withId(MEASURED_TIME_ID),
+ "measured_time",
+ "http://schema.org/Number")
+ ))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
-
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
fieldName = extractor.mappingPropertyValue(FIELD_ID);
String measureTrueString = extractor.selectedSingleValue(TIMER_FIELD_ID,
String.class);
String outputUnit = extractor.selectedSingleValue(OUTPUT_UNIT_ID,
String.class);
-
measureTrue = false;
timestamp = Long.MIN_VALUE;
if (measureTrueString.equals(TRUE)) {
measureTrue = true;
}
-
outputDivisor = 1.0;
if (outputUnit.equals(SECONDS)) {
outputDivisor = 1000.0;
@@ -109,9 +107,8 @@ public class BooleanTimerProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event inputEvent,
- SpOutputCollector collector) throws SpRuntimeException {
+ SpOutputCollector collector) {
boolean field =
inputEvent.getFieldBySelector(this.fieldName).getAsPrimitive().getAsBoolean();
-
if (this.measureTrue == field) {
if (timestamp == Long.MIN_VALUE) {
timestamp = System.currentTimeMillis();
@@ -119,9 +116,7 @@ public class BooleanTimerProcessor extends
StreamPipesDataProcessor {
} else {
if (timestamp != Long.MIN_VALUE) {
Long difference = System.currentTimeMillis() - timestamp;
-
double result = difference / this.outputDivisor;
-
inputEvent.addField("measured_time", result);
timestamp = Long.MIN_VALUE;
collector.collect(inputEvent);
@@ -130,7 +125,6 @@ public class BooleanTimerProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java
index d4ea565869..61ff8c374b 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java
@@ -27,14 +27,16 @@ import
org.apache.streampipes.commons.parser.PrimitiveTypeParser;
import org.apache.streampipes.commons.parser.StringParser;
import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
import
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
import
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventProperty;
@@ -43,6 +45,7 @@ import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Filetypes;
@@ -51,8 +54,6 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.Tuple2;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
@@ -68,12 +69,13 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentUtils.getCsvParser;
+import static org.apache.streampipes.processors.transformation.jvm.processor
+ .csvmetadata.CsvMetadataEnrichmentUtils.getCsvParser;
public class CsvMetadataEnrichmentProcessor
- extends StreamPipesDataProcessor
- implements ResolvesContainerProvidedOptions,
- ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation,
ProcessingElementParameterExtractor> {
+ implements IStreamPipesDataProcessor,
+ ResolvesContainerProvidedOptions,
+
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation,
ProcessingElementParameterExtractor> {
private static final Logger LOG =
LoggerFactory.getLogger(CsvMetadataEnrichmentProcessor.class);
@@ -88,30 +90,40 @@ public class CsvMetadataEnrichmentProcessor
private Map<String, CSVRecord> columnMap;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.csvmetadata", 0)
- .category(DataProcessorType.ENRICH)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.anyProperty(),
- Labels.withId(MAPPING_FIELD_KEY),
- PropertyScope.NONE)
- .build())
- .requiredFile(Labels.withId(CSV_FILE_KEY), Filetypes.CSV)
-
.requiredSingleValueSelectionFromContainer(Labels.withId(FIELD_TO_MATCH),
- Arrays.asList(MAPPING_FIELD_KEY, CSV_FILE_KEY))
-
.requiredMultiValueSelectionFromContainer(Labels.withId(FIELDS_TO_APPEND_KEY),
- Arrays.asList(MAPPING_FIELD_KEY, CSV_FILE_KEY, FIELD_TO_MATCH))
- .outputStrategy(OutputStrategies.customTransformation())
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ CsvMetadataEnrichmentProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.csvmetadata", 0)
+ .category(DataProcessorType.ENRICH)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+
.requiredPropertyWithUnaryMapping(
+
EpRequirements.anyProperty(),
+
Labels.withId(MAPPING_FIELD_KEY),
+ PropertyScope.NONE
+ )
+ .build())
+ .requiredFile(Labels.withId(CSV_FILE_KEY), Filetypes.CSV)
+ .requiredSingleValueSelectionFromContainer(
+ Labels.withId(FIELD_TO_MATCH),
+ Arrays.asList(MAPPING_FIELD_KEY, CSV_FILE_KEY)
+ )
+ .requiredMultiValueSelectionFromContainer(
+ Labels.withId(FIELDS_TO_APPEND_KEY),
+ Arrays.asList(MAPPING_FIELD_KEY, CSV_FILE_KEY, FIELD_TO_MATCH)
+ )
+ .outputStrategy(OutputStrategies.customTransformation())
+ .build()
+ );
}
@Override
- public List<Option> resolveOptions(String requestId,
- IStaticPropertyExtractor
parameterExtractor) {
+ public List<Option> resolveOptions(
+ String requestId,
+ IStaticPropertyExtractor parameterExtractor
+ ) {
try {
String fileContents = getFileContents(parameterExtractor);
if (requestId.equals(FIELDS_TO_APPEND_KEY)) {
@@ -127,8 +139,10 @@ public class CsvMetadataEnrichmentProcessor
}
@Override
- public EventSchema resolveOutputStrategy(DataProcessorInvocation
processingElement,
- ProcessingElementParameterExtractor
parameterExtractor)
+ public EventSchema resolveOutputStrategy(
+ DataProcessorInvocation processingElement,
+ ProcessingElementParameterExtractor parameterExtractor
+ )
throws SpRuntimeException {
List<EventProperty> properties = processingElement
.getInputStreams()
@@ -136,8 +150,10 @@ public class CsvMetadataEnrichmentProcessor
.getEventSchema()
.getEventProperties();
- List<String> columnsToInclude =
parameterExtractor.selectedMultiValues(FIELDS_TO_APPEND_KEY,
- String.class);
+ List<String> columnsToInclude = parameterExtractor.selectedMultiValues(
+ FIELDS_TO_APPEND_KEY,
+ String.class
+ );
try {
String fileContents = getFileContents(parameterExtractor);
properties.addAll(getAppendProperties(fileContents, columnsToInclude));
@@ -148,8 +164,10 @@ public class CsvMetadataEnrichmentProcessor
return new EventSchema(properties);
}
- private List<EventProperty> getAppendProperties(String fileContents,
- List<String>
columnsToInclude) throws IOException {
+ private List<EventProperty> getAppendProperties(
+ String fileContents,
+ List<String> columnsToInclude
+ ) throws IOException {
CSVParser parser = getCsvParser(fileContents);
List<EventProperty> propertiesToAppend = new ArrayList<>();
List<CSVRecord> records = parser.getRecords();
@@ -166,8 +184,10 @@ public class CsvMetadataEnrichmentProcessor
return CsvMetadataEnrichmentUtils.getGuessedEventProperty(column,
firstRecord);
}
- private List<Option> getOptionsFromColumnNames(String fileContents,
- List<String> columnsToIgnore)
throws IOException {
+ private List<Option> getOptionsFromColumnNames(
+ String fileContents,
+ List<String> columnsToIgnore
+ ) throws IOException {
return getColumnNames(fileContents, columnsToIgnore)
.stream()
.map(Option::new)
@@ -180,13 +200,15 @@ public class CsvMetadataEnrichmentProcessor
.getHeaderMap()
.keySet()
.stream()
- .filter(key -> columnsToIgnore.stream().noneMatch(c -> c.equals(key)))
+ .filter(key -> columnsToIgnore.stream()
+ .noneMatch(c -> c.equals(key)))
.collect(Collectors.toList());
}
private String getFileContents(IParameterExtractor extractor) {
String filename = extractor.selectedFilename(CSV_FILE_KEY);
- return
getStreamPipesClientInstance().fileApi().getFileContentAsString(filename);
+ return getStreamPipesClientInstance().fileApi()
+ .getFileContentAsString(filename);
}
private StreamPipesClient getStreamPipesClientInstance() {
@@ -194,15 +216,16 @@ public class CsvMetadataEnrichmentProcessor
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) {
var extractor = parameters.extractor();
mappingFieldSelector = extractor.mappingPropertyValue(MAPPING_FIELD_KEY);
List<String> fieldsToAppend =
extractor.selectedMultiValues(FIELDS_TO_APPEND_KEY, String.class);
matchingColumn = extractor.selectedSingleValue(FIELD_TO_MATCH,
String.class);
String fileContents = getFileContents(extractor);
-
try {
makeColumnMap(fileContents);
} catch (IOException e) {
@@ -211,7 +234,14 @@ public class CsvMetadataEnrichmentProcessor
if (!this.columnMap.isEmpty()) {
this.columnsToAppend = fieldsToAppend
.stream()
- .map(c -> makeParser(c,
this.columnMap.entrySet().stream().findFirst().get().getValue()))
+ .map(c -> makeParser(
+ c,
+ this.columnMap.entrySet()
+ .stream()
+ .findFirst()
+ .get()
+ .getValue()
+ ))
.collect(Collectors.toList());
} else {
LOG.warn("Could not find any rows, does the CSV file contain data?");
@@ -220,9 +250,11 @@ public class CsvMetadataEnrichmentProcessor
}
@Override
- public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ public void onEvent(Event event, SpOutputCollector collector) {
String lookupValue =
-
event.getFieldBySelector(mappingFieldSelector).getAsPrimitive().getAsString();
+ event.getFieldBySelector(mappingFieldSelector)
+ .getAsPrimitive()
+ .getAsString();
CSVRecord record = this.columnMap.get(lookupValue);
for (Tuple2<String, PrimitiveTypeParser> columnToAppend : columnsToAppend)
{
event.addField(columnToAppend.k, getRecordValueOrDefault(record,
columnToAppend));
@@ -231,12 +263,14 @@ public class CsvMetadataEnrichmentProcessor
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
this.columnMap = new HashMap<>();
}
- private Object getRecordValueOrDefault(CSVRecord record, Tuple2<String,
- PrimitiveTypeParser> columnToAppend) {
+ private Object getRecordValueOrDefault(
+ CSVRecord record, Tuple2<String,
+ PrimitiveTypeParser> columnToAppend
+ ) {
if (record != null) {
return columnToAppend.v.parse(record.get(columnToAppend.k));
} else {
@@ -244,8 +278,10 @@ public class CsvMetadataEnrichmentProcessor
}
}
- private Tuple2<String, PrimitiveTypeParser> makeParser(String columnName,
- CSVRecord record) {
+ private Tuple2<String, PrimitiveTypeParser> makeParser(
+ String columnName,
+ CSVRecord record
+ ) {
Datatypes columnDatatype =
CsvMetadataEnrichmentUtils.getGuessDatatype(columnName, record);
if (columnDatatype.equals(Datatypes.Float)) {
return new Tuple2<>(columnName, new FloatParser());
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java
index 7c4e2801aa..5e6bc253a0 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java
@@ -18,17 +18,18 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.datetime;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -36,8 +37,6 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -48,7 +47,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class DateTimeFromStringProcessor extends StreamPipesDataProcessor {
+public class DateTimeFromStringProcessor implements IStreamPipesDataProcessor {
public static final String FIELD_ID = "inputField";
public static final String OUTPUT_TIMESTAMP_RUNTIME_NAME =
"timestringInMillis";
@@ -59,42 +58,42 @@ public class DateTimeFromStringProcessor extends
StreamPipesDataProcessor {
private String selectedTimeZone;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.datetime", 0)
- .category(DataProcessorType.STRING_OPERATOR, DataProcessorType.TIME)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
-
.requiredPropertyWithUnaryMapping(
-
EpRequirements.stringReq(),
- Labels.withId(FIELD_ID),
- PropertyScope.NONE
- )
- .build())
- .requiredSingleValueSelection(Labels.withId(INPUT_TIMEZONE_KEY),
- Options.from(getTimeZoneOptions()), true
- )
- .outputStrategy(
- OutputStrategies.append(
- EpProperties.timestampProperty(OUTPUT_TIMESTAMP_RUNTIME_NAME),
- EpProperties.stringEp(
- // We can use the labels from the input timezone here
- Labels.withId(INPUT_TIMEZONE_KEY),
- OUTPUT_TIMEZONE_RUNTIME_NAME,
- SO.SCHEDULE_TIMEZONE
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ DateTimeFromStringProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.datetime", 0)
+ .category(DataProcessorType.STRING_OPERATOR,
DataProcessorType.TIME)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.stringReq(),
+ Labels.withId(FIELD_ID),
+ PropertyScope.NONE
+ )
+ .build())
+ .requiredSingleValueSelection(Labels.withId(INPUT_TIMEZONE_KEY),
+ Options.from(getTimeZoneOptions()), true)
+ .outputStrategy(
+ OutputStrategies.append(
+
EpProperties.timestampProperty(OUTPUT_TIMESTAMP_RUNTIME_NAME),
+ EpProperties.stringEp(
+ Labels.withId(INPUT_TIMEZONE_KEY),
+ OUTPUT_TIMEZONE_RUNTIME_NAME,
+ SO.SCHEDULE_TIMEZONE
+ )
)
)
- )
- .build();
+ .build()
+ );
}
@Override
- public void onInvocation(
- ProcessorParams parameters, SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext
- ) throws SpRuntimeException {
- ProcessingElementParameterExtractor extractor = parameters.extractor();
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
+ var extractor = parameters.extractor();
this.streamInputDateTimeFieldName =
extractor.mappingPropertyValue(FIELD_ID);
this.selectedTimeZone = extractor.selectedSingleValue(INPUT_TIMEZONE_KEY,
String.class);
}
@@ -106,7 +105,6 @@ public class DateTimeFromStringProcessor extends
StreamPipesDataProcessor {
.getAsString();
DateTimeFormatter dtFormatter = DateTimeFormatter.ISO_DATE_TIME;
ZonedDateTime zdt = parseDateTime(dateTimeString, dtFormatter);
-
/*
* A temporary workaround is in place to put a long represent the
* zonedDateTimeVariable One possible workaround is to use the time zone
and the
@@ -119,13 +117,11 @@ public class DateTimeFromStringProcessor extends
StreamPipesDataProcessor {
.toEpochMilli()
);
event.addField(OUTPUT_TIMEZONE_RUNTIME_NAME, selectedTimeZone);
-
collector.collect(event);
}
@Override
- public void onDetach() {
-
+ public void onPipelineStopped() {
}
private ZonedDateTime parseDateTime(String dateTimeString, DateTimeFormatter
dtf) {
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/measurementconverter/MeasurementUnitConverterProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/measurementconverter/MeasurementUnitConverterProcessor.java
index b61742f89b..bea2703d43 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/measurementconverter/MeasurementUnitConverterProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/measurementconverter/MeasurementUnitConverterProcessor.java
@@ -21,11 +21,13 @@ package
org.apache.streampipes.processors.transformation.jvm.processor.measureme
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
@@ -35,13 +37,12 @@ import
org.apache.streampipes.model.staticproperty.RuntimeResolvableOneOfStaticP
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.PropertyRequirementsBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.TransformOperations;
import org.apache.streampipes.units.UnitProvider;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import com.github.jqudt.Quantity;
import com.github.jqudt.Unit;
@@ -51,8 +52,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
-public class MeasurementUnitConverterProcessor extends StreamPipesDataProcessor
- implements ResolvesContainerProvidedOptions {
+public class MeasurementUnitConverterProcessor implements
IStreamPipesDataProcessor,
+ ResolvesContainerProvidedOptions {
private static final String CONVERT_PROPERTY = "convert-property";
private static final String OUTPUT_UNIT = "output-unit";
@@ -63,35 +64,39 @@ public class MeasurementUnitConverterProcessor extends
StreamPipesDataProcessor
private String convertProperty;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.measurementunitconverter",
0)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(PropertyRequirementsBuilder
- .create()
- .measurementUnitPresence()
- .build(),
- Labels.withId(CONVERT_PROPERTY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .requiredSingleValueSelectionFromContainer(Labels.withId(OUTPUT_UNIT))
- .outputStrategy(OutputStrategies.transform(TransformOperations
- .dynamicMeasurementUnitTransformation(CONVERT_PROPERTY,
OUTPUT_UNIT)))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ MeasurementUnitConverterProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.measurementunitconverter",
0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(PropertyRequirementsBuilder
+ .create()
+ .measurementUnitPresence()
+ .build(),
+ Labels.withId(CONVERT_PROPERTY),
+ PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+
.requiredSingleValueSelectionFromContainer(Labels.withId(OUTPUT_UNIT))
+ .outputStrategy(OutputStrategies.transform(TransformOperations
+ .dynamicMeasurementUnitTransformation(CONVERT_PROPERTY,
OUTPUT_UNIT)))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
this.convertProperty = extractor.mappingPropertyValue(CONVERT_PROPERTY);
String runtimeName =
extractor.getEventPropertyBySelector(this.convertProperty).getRuntimeName();
String inputUnitId = extractor.measurementUnit(runtimeName, 0);
- String outputUnitId =
parameters.getGraph().getStaticProperties().stream().filter(sp -> sp
+ String outputUnitId =
parameters.getModel().getStaticProperties().stream().filter(sp -> sp
.getInternalName().equals(OUTPUT_UNIT))
.map(sp ->
(RuntimeResolvableOneOfStaticProperty) sp)
@@ -105,7 +110,7 @@ public class MeasurementUnitConverterProcessor extends
StreamPipesDataProcessor
}
@Override
- public void onEvent(Event in, SpOutputCollector out) throws
SpRuntimeException {
+ public void onEvent(Event in, SpOutputCollector out) {
double value =
in.getFieldBySelector(convertProperty).getAsPrimitive().getAsDouble();
// transform old value to new unit
@@ -120,8 +125,7 @@ public class MeasurementUnitConverterProcessor extends
StreamPipesDataProcessor
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
@Override
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/round/RoundProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/round/RoundProcessor.java
index 51fc61494a..9885494637 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/round/RoundProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/round/RoundProcessor.java
@@ -18,23 +18,23 @@
package org.apache.streampipes.processors.transformation.jvm.processor.round;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.math.BigDecimal;
import java.math.RoundingMode;
@@ -42,7 +42,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class RoundProcessor extends StreamPipesDataProcessor {
+public class RoundProcessor implements IStreamPipesDataProcessor {
private List<String> fieldsToBeRounded;
private int numDigits;
private String roundingMode;
@@ -63,33 +63,37 @@ public class RoundProcessor extends
StreamPipesDataProcessor {
};
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
- .create("org.apache.streampipes.processors.transformation.jvm.round",
0)
- .category(DataProcessorType.TRANSFORM)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithNaryMapping(EpRequirements.numberReq(),
Labels.withId(FIELDS), PropertyScope.NONE)
- .build())
- .requiredIntegerParameter(Labels.withId(NUM_DIGITS))
- .requiredSingleValueSelection(Labels.withId(ROUNDING_MODE),
- Options.from(ROUNDING_MODE_MAP.keySet().toArray(new String[0])))
- .outputStrategy(OutputStrategies.keep())
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ RoundProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.round", 0)
+ .category(DataProcessorType.TRANSFORM)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithNaryMapping(EpRequirements.numberReq(),
Labels.withId(FIELDS), PropertyScope.NONE)
+ .build())
+ .requiredIntegerParameter(Labels.withId(NUM_DIGITS))
+ .requiredSingleValueSelection(Labels.withId(ROUNDING_MODE),
+ Options.from(ROUNDING_MODE_MAP.keySet().toArray(new
String[0])))
+ .outputStrategy(OutputStrategies.keep())
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
fieldsToBeRounded = parameters.extractor().mappingPropertyValues(FIELDS);
numDigits = parameters.extractor().singleValueParameter(NUM_DIGITS,
Integer.class);
roundingMode = parameters.extractor().selectedSingleValue(ROUNDING_MODE,
String.class);
}
@Override
- public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ public void onEvent(Event event, SpOutputCollector collector) {
for (String fieldToBeRounded : fieldsToBeRounded) {
double value =
event.getFieldBySelector(fieldToBeRounded).getAsPrimitive().getAsDouble();
double roundedValue =
@@ -100,6 +104,7 @@ public class RoundProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() {
+ public void onPipelineStopped() {
}
-}
\ No newline at end of file
+}
+
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/StateUtils.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/StateUtils.java
index 1233aed153..744b9828e2 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/StateUtils.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/StateUtils.java
@@ -18,11 +18,11 @@
package org.apache.streampipes.processors.transformation.jvm.processor.state;
+import
org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor;
import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import java.util.List;
import java.util.stream.Collectors;
@@ -36,15 +36,15 @@ public class StateUtils {
public static final String LABEL_STRING_ID = "labelStringId";
- public static String getLabelName(ProcessingElementParameterExtractor
extractor) {
+ public static String getLabelName(IDataProcessorParameterExtractor
extractor) {
return extractor.textParameter(LABEL_NAME);
}
- public static List<StaticPropertyGroup>
getGroupItems(ProcessingElementParameterExtractor extractor) {
+ public static List<StaticPropertyGroup>
getGroupItems(IDataProcessorParameterExtractor extractor) {
return extractor.collectionMembersAsGroup(LABEL_COLLECTION_ID);
}
- public static List<Double>
getNumberValues(ProcessingElementParameterExtractor extractor) {
+ public static List<Double> getNumberValues(IDataProcessorParameterExtractor
extractor) {
return getGroupItems(extractor)
.stream()
.map(group -> (
@@ -56,7 +56,7 @@ public class StateUtils {
.collect(Collectors.toList());
}
- public static List<String>
getLabelStrings(ProcessingElementParameterExtractor extractor) {
+ public static List<String> getLabelStrings(IDataProcessorParameterExtractor
extractor) {
return getGroupItems(extractor)
.stream()
.map(group -> (extractor
@@ -66,7 +66,7 @@ public class StateUtils {
.collect(Collectors.toList());
}
- public static List<String>
getComparators(ProcessingElementParameterExtractor extractor) {
+ public static List<String> getComparators(IDataProcessorParameterExtractor
extractor) {
return getGroupItems(extractor)
.stream()
.map(group -> (extractor
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferProcessor.java
index 734ff69d09..f706339422 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferProcessor.java
@@ -18,15 +18,17 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.state.buffer;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -34,16 +36,13 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.vocabulary.SPSensor;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class StateBufferProcessor extends StreamPipesDataProcessor {
+public class StateBufferProcessor implements IStreamPipesDataProcessor {
public static final String TIMESTAMP_FIELD_ID = "timestampId";
public static final String STATE_FIELD_ID = "stateId";
@@ -59,40 +58,42 @@ public class StateBufferProcessor extends
StreamPipesDataProcessor {
private Map<String, List> stateBuffer;
-
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.processor.state.buffer",
0)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_FIELD_ID),
- PropertyScope.HEADER_PROPERTY)
- .requiredPropertyWithUnaryMapping(
- EpRequirements.semanticTypeReqList(SPSensor.STATE),
- Labels.withId(STATE_FIELD_ID),
- PropertyScope.NONE)
- .requiredPropertyWithUnaryMapping(
- EpRequirements.numberReq(),
- Labels.withId(SENSOR_VALUE_FIELD_ID),
- PropertyScope.MEASUREMENT_PROPERTY)
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ StateBufferProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.processor.state.buffer",
0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_FIELD_ID),
+ PropertyScope.HEADER_PROPERTY)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.semanticTypeReqList(SPSensor.STATE),
+ Labels.withId(STATE_FIELD_ID),
+ PropertyScope.NONE)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.withId(SENSOR_VALUE_FIELD_ID),
+ PropertyScope.MEASUREMENT_PROPERTY)
+ .build()
+ )
+ .outputStrategy(OutputStrategies.fixed(
+ EpProperties.timestampProperty(TIMESTAMP),
+ EpProperties.listDoubleEp(Labels.withId(VALUES), VALUES,
SO.NUMBER),
+ EpProperties.listStringEp(Labels.withId(STATE), STATE,
SPSensor.STATE)
+ ))
.build()
- )
- .outputStrategy(OutputStrategies.fixed(
- EpProperties.timestampProperty(TIMESTAMP),
- EpProperties.listDoubleEp(Labels.withId(VALUES), VALUES,
SO.NUMBER),
- EpProperties.listStringEp(Labels.withId(STATE), STATE,
SPSensor.STATE)
- ))
- .build();
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
timeProperty = extractor.mappingPropertyValue(TIMESTAMP_FIELD_ID);
stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
@@ -101,21 +102,14 @@ public class StateBufferProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onEvent(Event inputEvent,
- SpOutputCollector collector) throws SpRuntimeException {
+ public void onEvent(Event inputEvent, SpOutputCollector collector) {
long timestamp =
inputEvent.getFieldBySelector(this.timeProperty).getAsPrimitive().getAsLong();
List<String> states =
inputEvent.getFieldBySelector(this.stateProperty).getAsList().parseAsSimpleType(String.class);
double value =
inputEvent.getFieldBySelector(this.sensorValueProperty).getAsPrimitive().getAsDouble();
// add value to state buffer
for (String state : states) {
- if (stateBuffer.containsKey(state)) {
- stateBuffer.get(state).add(value);
- } else {
- List tmp = new ArrayList();
- tmp.add(value);
- stateBuffer.put(state, tmp);
- }
+ stateBuffer.computeIfAbsent(state, k -> new ArrayList<>()).add(value);
}
// emit event if state is not in event anymore
@@ -124,20 +118,18 @@ public class StateBufferProcessor extends
StreamPipesDataProcessor {
if (!states.contains(key)) {
Event resultEvent = new Event();
resultEvent.addField(StateBufferProcessor.VALUES,
stateBuffer.get(key));
- resultEvent.addField(StateBufferProcessor.STATE, Arrays.asList(key));
+ resultEvent.addField(StateBufferProcessor.STATE, List.of(key));
resultEvent.addField(StateBufferProcessor.TIMESTAMP, timestamp);
collector.collect(resultEvent);
keysToRemove.add(key);
}
}
-
for (String s : keysToRemove) {
stateBuffer.remove(s);
}
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerProcessor.java
index 8a09b578ac..08b2601dbb 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerProcessor.java
@@ -18,13 +18,14 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventSchema;
@@ -35,6 +36,7 @@ import
org.apache.streampipes.processors.transformation.jvm.processor.state.labe
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
@@ -43,8 +45,6 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.SPSensor;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import com.google.common.math.Stats;
@@ -60,8 +60,8 @@ import static
org.apache.streampipes.processors.transformation.jvm.processor.sta
import static
org.apache.streampipes.processors.transformation.jvm.processor.state.StateUtils.getLabelStrings;
import static
org.apache.streampipes.processors.transformation.jvm.processor.state.StateUtils.getNumberValues;
-public class StateBufferLabelerProcessor extends StreamPipesDataProcessor
- implements
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation,
ProcessingElementParameterExtractor> {
+public class StateBufferLabelerProcessor implements IStreamPipesDataProcessor,
+ ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation,
ProcessingElementParameterExtractor> {
public static final String STATE_FILTER_ID = "stateFilterId";
public static final String STATE_FIELD_ID = "stateFieldId";
@@ -82,69 +82,65 @@ public class StateBufferLabelerProcessor extends
StreamPipesDataProcessor
private List<Statement> statements;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer",
0)
- .category(DataProcessorType.STRING_OPERATOR)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.listRequirement(),
- Labels.withId(SENSOR_VALUE_ID),
- PropertyScope.NONE)
- .requiredPropertyWithUnaryMapping(
- EpRequirements.semanticTypeReqList(SPSensor.STATE),
- Labels.withId(STATE_FIELD_ID),
- PropertyScope.NONE)
- .build())
- .requiredTextParameter(Labels.withId(LABEL_NAME))
- .requiredTextParameter(Labels.withId(STATE_FILTER_ID))
- .requiredSingleValueSelection(Labels.withId(OPERATIONS_ID),
- Options.from(MINIMUM, MAXIMUM, AVERAGE))
- .requiredCollection(
- Labels.withId(LABEL_COLLECTION_ID),
- StaticProperties.group(Labels.from("group", "Group", ""), false,
-
StaticProperties.singleValueSelection(Labels.withId(COMPARATOR_ID),
- Options.from("<", "<=", ">", ">=", "==", "*")),
-
StaticProperties.doubleFreeTextProperty(Labels.withId(NUMBER_VALUE_ID)),
-
StaticProperties.stringFreeTextProperty(Labels.withId(LABEL_STRING_ID))
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ StateBufferLabelerProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer",
0)
+ .category(DataProcessorType.STRING_OPERATOR)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.listRequirement(),
+ Labels.withId(SENSOR_VALUE_ID),
+ PropertyScope.NONE)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.semanticTypeReqList(SPSensor.STATE),
+ Labels.withId(STATE_FIELD_ID),
+ PropertyScope.NONE)
+ .build())
+ .requiredTextParameter(Labels.withId(LABEL_NAME))
+ .requiredTextParameter(Labels.withId(STATE_FILTER_ID))
+ .requiredSingleValueSelection(Labels.withId(OPERATIONS_ID),
+ Options.from(MINIMUM, MAXIMUM, AVERAGE))
+ .requiredCollection(
+ Labels.withId(LABEL_COLLECTION_ID),
+ StaticProperties.group(Labels.from("group", "Group", ""),
false,
+
StaticProperties.singleValueSelection(Labels.withId(COMPARATOR_ID),
+ Options.from("<", "<=", ">", ">=", "==", "*")),
+
StaticProperties.doubleFreeTextProperty(Labels.withId(NUMBER_VALUE_ID)),
+
StaticProperties.stringFreeTextProperty(Labels.withId(LABEL_STRING_ID))
+ )
)
- )
- .outputStrategy(OutputStrategies.append(
- EpProperties.stringEp(Labels.withId(LABEL), LABEL, SPSensor.STATE,
PropertyScope.DIMENSION_PROPERTY)
- ))
- .build();
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(Labels.withId(LABEL), LABEL,
SPSensor.STATE, PropertyScope.DIMENSION_PROPERTY)
+ ))
+ .build()
+ );
}
@Override
public EventSchema resolveOutputStrategy(DataProcessorInvocation
processingElement,
- ProcessingElementParameterExtractor
parameterExtractor)
- throws SpRuntimeException {
-
+ ProcessingElementParameterExtractor
parameterExtractor) {
String labelName = getLabelName(parameterExtractor);
-
List<String> labelStrings = getLabelStrings(parameterExtractor);
-
return LabelerUtils.resolveOutputStrategy(processingElement, labelName,
labelStrings);
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
sensorListValueProperty = extractor.mappingPropertyValue(SENSOR_VALUE_ID);
stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
stateFilter = extractor.singleValueParameter(STATE_FILTER_ID,
String.class);
selectedOperation = extractor.selectedSingleValue(OPERATIONS_ID,
String.class);
-
labelName = getLabelName(extractor);
-
List<Double> numberValues = getNumberValues(extractor);
List<String> labelStrings = getLabelStrings(extractor);
List<String> comparators = getComparators(extractor);
-
statements = StatementUtils.getStatements(
numberValues,
labelStrings,
@@ -152,15 +148,13 @@ public class StateBufferLabelerProcessor extends
StreamPipesDataProcessor
}
@Override
- public void onEvent(Event inputEvent,
- SpOutputCollector collector) throws SpRuntimeException {
+ public void onEvent(Event inputEvent, SpOutputCollector collector) {
List<Double> values =
inputEvent.getFieldBySelector(this.sensorListValueProperty).getAsList().parseAsSimpleType(Double.class);
List<String> states =
inputEvent.getFieldBySelector(this.stateProperty).getAsList().parseAsSimpleType(String.class);
if (states.contains(this.stateFilter) || this.stateFilter.equals("*")) {
double calculatedValue;
-
if (StateBufferLabelerProcessor.MAXIMUM.equals(this.selectedOperation)) {
calculatedValue = Stats.of(values).max();
} else if
(StateBufferLabelerProcessor.MINIMUM.equals(this.selectedOperation)) {
@@ -168,14 +162,12 @@ public class StateBufferLabelerProcessor extends
StreamPipesDataProcessor
} else {
calculatedValue = Stats.of(values).mean();
}
-
Event resultEvent = StatementUtils.addLabel(inputEvent, labelName,
calculatedValue, this.statements);
collector.collect(resultEvent);
}
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/task/TaskDurationProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/task/TaskDurationProcessor.java
index 44fa065b28..552f0a450c 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/task/TaskDurationProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/task/TaskDurationProcessor.java
@@ -17,26 +17,26 @@
*/
package org.apache.streampipes.processors.transformation.jvm.processor.task;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class TaskDurationProcessor extends StreamPipesDataProcessor {
+public class TaskDurationProcessor implements IStreamPipesDataProcessor {
private static final String TASK_FIELD_KEY = "task-field";
private static final String TIMESTAMP_FIELD_KEY = "timestamp-field";
@@ -56,34 +56,36 @@ public class TaskDurationProcessor extends
StreamPipesDataProcessor {
private Long lastTimestamp;
private Double outputDivisor;
-
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.transformation.jvm.taskduration", 0)
- .category(DataProcessorType.TIME)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.anyProperty(),
- Labels.withId(TASK_FIELD_KEY),
- PropertyScope.NONE)
- .requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_FIELD_KEY), PropertyScope.NONE)
- .build())
- .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID),
Options.from(MILLISECONDS, SECONDS, MINUTES))
-
.outputStrategy(OutputStrategies.fixed(EpProperties.stringEp(Labels.withId(TASK_ID),
- "processId", "http://schema.org/taskId"),
- EpProperties.integerEp(Labels.withId(DURATION_ID), "duration",
- "http://schema.org/duration")))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ TaskDurationProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.transformation.jvm.taskduration", 0)
+ .category(DataProcessorType.TIME)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.anyProperty(),
+ Labels.withId(TASK_FIELD_KEY),
+ PropertyScope.NONE)
+
.requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_FIELD_KEY), PropertyScope.NONE)
+ .build())
+ .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID),
Options.from(MILLISECONDS, SECONDS, MINUTES))
+
.outputStrategy(OutputStrategies.fixed(EpProperties.stringEp(Labels.withId(TASK_ID),
+ "processId", "http://schema.org/taskId"),
+ EpProperties.integerEp(Labels.withId(DURATION_ID), "duration",
+ "http://schema.org/duration")))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataProcessorParameters parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
var extractor = parameters.extractor();
taskFieldSelector = extractor.mappingPropertyValue(TASK_FIELD_KEY);
timestampFieldSelector =
extractor.mappingPropertyValue(TIMESTAMP_FIELD_KEY);
@@ -99,7 +101,7 @@ public class TaskDurationProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event,
- SpOutputCollector collector) throws SpRuntimeException {
+ SpOutputCollector collector) {
String taskValue =
event.getFieldBySelector(taskFieldSelector).getAsPrimitive().getAsString();
Long timestampValue =
event.getFieldBySelector(timestampFieldSelector).getAsPrimitive().getAsLong();
@@ -111,7 +113,6 @@ public class TaskDurationProcessor extends
StreamPipesDataProcessor {
if (!this.lastValue.equals(taskValue)) {
Long duration = timestampValue - this.lastTimestamp;
-
double result = duration / this.outputDivisor;
Event outEvent = new Event();
@@ -131,7 +132,7 @@ public class TaskDurationProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
this.lastValue = null;
}
}