This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
3622-update-processors-in-streampipes-processors-transformation-jvm
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3622-update-processors-in-streampipes-processors-transformation-jvm 
by this push:
     new 554561dead refactor(#3622): Update processors to use 
IStreamPipesDataProcessor
554561dead is described below

commit 554561deadc8f0e4ce35ab853fb1f8703cf9a310
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri May 23 09:27:18 2025 +0200

    refactor(#3622): Update processors to use IStreamPipesDataProcessor
---
 .../processor/array/count/CountArrayProcessor.java |  54 ++++----
 .../processor/array/split/SplitArrayProcessor.java | 109 +++++++++--------
 .../edge/SignalEdgeFilterProcessor.java            |  63 +++++-----
 .../inverter/BooleanInverterProcessor.java         |  57 ++++-----
 .../logical/BooleanOperatorProcessor.java          |  79 ++++++------
 .../state/BooleanToStateProcessor.java             |  62 +++++-----
 .../timekeeping/BooleanTimekeepingProcessor.java   |  88 ++++++-------
 .../booloperator/timer/BooleanTimerProcessor.java  |  72 +++++------
 .../CsvMetadataEnrichmentProcessor.java            | 136 +++++++++++++--------
 .../datetime/DateTimeFromStringProcessor.java      |  78 ++++++------
 .../MeasurementUnitConverterProcessor.java         |  62 +++++-----
 .../jvm/processor/round/RoundProcessor.java        |  55 +++++----
 .../jvm/processor/state/StateUtils.java            |  12 +-
 .../state/buffer/StateBufferProcessor.java         |  88 ++++++-------
 .../buffer/StateBufferLabelerProcessor.java        | 102 +++++++---------
 .../jvm/processor/task/TaskDurationProcessor.java  |  65 +++++-----
 16 files changed, 600 insertions(+), 582 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
index 0e72d26823..6008c99117 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
@@ -18,29 +18,29 @@
 
 package 
org.apache.streampipes.processors.transformation.jvm.processor.array.count;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.AbstractField;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.util.List;
 
-public class CountArrayProcessor extends StreamPipesDataProcessor {
+public class CountArrayProcessor implements IStreamPipesDataProcessor {
 
   public static final String COUNT_NAME = "countValue";
   public static final String ARRAY_FIELD = "array-field";
@@ -48,41 +48,39 @@ public class CountArrayProcessor extends 
StreamPipesDataProcessor {
   private String arrayField;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.count-array", 0)
-        .category(DataProcessorType.COUNT_OPERATOR)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(
-            StreamRequirementsBuilder.create()
-                
.requiredPropertyWithUnaryMapping(EpRequirements.listRequirement(),
-                    Labels.withId(ARRAY_FIELD), PropertyScope.NONE)
-                .build())
-        
.outputStrategy(OutputStrategies.append(EpProperties.doubleEp(Labels.empty(), 
COUNT_NAME,
-            SO.NUMBER)))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        CountArrayProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.count-array", 0)
+            .category(DataProcessorType.COUNT_OPERATOR)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(
+                StreamRequirementsBuilder.create()
+                    
.requiredPropertyWithUnaryMapping(EpRequirements.listRequirement(),
+                        Labels.withId(ARRAY_FIELD), PropertyScope.NONE)
+                    .build())
+            
.outputStrategy(OutputStrategies.append(EpProperties.doubleEp(Labels.empty(), 
COUNT_NAME, SO.NUMBER)))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
     this.arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD);
   }
 
   @Override
-  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-
+  public void onEvent(Event event, SpOutputCollector collector) {
     List<AbstractField> allEvents = 
event.getFieldBySelector(arrayField).getAsList().getRawValue();
-
     event.addField(CountArrayProcessor.COUNT_NAME, allEvents.size());
-
     collector.collect(event);
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayProcessor.java
index 742c0243d8..ac924933df 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayProcessor.java
@@ -18,13 +18,14 @@
 
 package 
org.apache.streampipes.processors.transformation.jvm.processor.array.split;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import 
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.AbstractField;
@@ -36,21 +37,20 @@ import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class SplitArrayProcessor extends StreamPipesDataProcessor
-    implements 
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, 
ProcessingElementParameterExtractor> {
-
+public class SplitArrayProcessor
+    implements IStreamPipesDataProcessor,
+               
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, 
ProcessingElementParameterExtractor> {
   public static final String KEEP_PROPERTIES_ID = "keep";
   public static final String ARRAY_FIELD_ID = "array-field";
   public static final String VALUE = "array_value";
@@ -59,28 +59,36 @@ public class SplitArrayProcessor extends 
StreamPipesDataProcessor
   private List<String> keepProperties;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.split-array", 0)
-        .category(DataProcessorType.TRANSFORM)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithNaryMapping(EpRequirements.anyProperty(),
-                Labels.withId(KEEP_PROPERTIES_ID),
-                PropertyScope.NONE)
-            .requiredPropertyWithUnaryMapping(EpRequirements.listRequirement(),
-                Labels.withId(ARRAY_FIELD_ID),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(OutputStrategies.customTransformation())
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        SplitArrayProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.split-array", 0)
+            .category(DataProcessorType.TRANSFORM)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                                                     
.requiredPropertyWithNaryMapping(
+                                                         
EpRequirements.anyProperty(),
+                                                         
Labels.withId(KEEP_PROPERTIES_ID),
+                                                         PropertyScope.NONE
+                                                     )
+                                                     
.requiredPropertyWithUnaryMapping(
+                                                         
EpRequirements.listRequirement(),
+                                                         
Labels.withId(ARRAY_FIELD_ID),
+                                                         PropertyScope.NONE
+                                                     )
+                                                     .build())
+            .outputStrategy(OutputStrategies.customTransformation())
+            .build()
+    );
   }
 
   @Override
-  public EventSchema resolveOutputStrategy(DataProcessorInvocation 
processingElement,
-                                           ProcessingElementParameterExtractor 
extractor)
-      throws SpRuntimeException {
+  public EventSchema resolveOutputStrategy(
+      DataProcessorInvocation processingElement,
+      ProcessingElementParameterExtractor extractor
+  ) {
     String arrayFieldSelector = extractor.mappingPropertyValue(ARRAY_FIELD_ID);
     List<String> keepPropertySelectors = 
extractor.mappingPropertyValues(KEEP_PROPERTIES_ID);
 
@@ -91,8 +99,7 @@ public class SplitArrayProcessor extends 
StreamPipesDataProcessor
     newProperty.setLabel("Array Value");
     newProperty.setDescription("Contains values of the array. Created by Split 
Array processor.");
 
-    List<EventProperty> keepProperties = extractor.getEventPropertiesBySelector
-        (keepPropertySelectors);
+    List<EventProperty> keepProperties = 
extractor.getEventPropertiesBySelector(keepPropertySelectors);
     outProperties.add(newProperty);
     outProperties.addAll(keepProperties);
 
@@ -100,32 +107,36 @@ public class SplitArrayProcessor extends 
StreamPipesDataProcessor
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD_ID);
-    keepProperties = 
parameters.extractor().mappingPropertyValues(KEEP_PROPERTIES_ID);
+  public void onPipelineStarted(
+      IDataProcessorParameters parameters,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) {
+    arrayField = parameters.extractor()
+                           .mappingPropertyValue(ARRAY_FIELD_ID);
+    keepProperties = parameters.extractor()
+                               .mappingPropertyValues(KEEP_PROPERTIES_ID);
   }
 
   @Override
-  public void onEvent(Event event,
-                      SpOutputCollector collector) throws SpRuntimeException {
-
-    List<AbstractField> allEvents = 
event.getFieldBySelector(arrayField).getAsList()
-        .parseAsCustomType(o -> {
-          if (o instanceof NestedField) {
-            return o;
-          } else if (o instanceof ListField) {
-            return o;
-          } else {
-            return o;
-          }
-        });
+  public void onEvent(Event event, SpOutputCollector collector) {
+    List<AbstractField> allEvents = event.getFieldBySelector(arrayField)
+                                         .getAsList()
+                                         .parseAsCustomType(o -> {
+                                           if (o instanceof NestedField) {
+                                             return o;
+                                           } else if (o instanceof ListField) {
+                                             return o;
+                                           } else {
+                                             return o;
+                                           }
+                                         });
 
     for (AbstractField field : allEvents) {
       Event outEvent = new Event();
       if (field instanceof NestedField) {
-        for (Map.Entry<String, AbstractField> key : ((NestedField) 
field).getRawValue().entrySet()) {
+        for (Map.Entry<String, AbstractField> key : ((NestedField) 
field).getRawValue()
+                                                                         
.entrySet()) {
           outEvent.addField(key.getValue());
         }
       } else {
@@ -141,7 +152,7 @@ public class SplitArrayProcessor extends 
StreamPipesDataProcessor
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
+
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
index efd28c53d4..48b1e9f8d8 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
@@ -18,28 +18,28 @@
 
 package 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class SignalEdgeFilterProcessor extends StreamPipesDataProcessor {
+public class SignalEdgeFilterProcessor implements IStreamPipesDataProcessor {
 
   public static final String BOOLEAN_SIGNAL_FIELD = "boolean_signal_field";
   public static final String FLANK_ID = "flank";
@@ -64,33 +64,35 @@ public class SignalEdgeFilterProcessor extends 
StreamPipesDataProcessor {
   private boolean edgeDetected;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge",
 0)
-        .category(DataProcessorType.BOOLEAN_OPERATOR, DataProcessorType.FILTER)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.booleanReq(), 
Labels.withId(BOOLEAN_SIGNAL_FIELD),
-                PropertyScope.NONE)
-            .build())
-        .requiredSingleValueSelection(Labels.withId(FLANK_ID), 
Options.from(BOTH, FLANK_UP, FLANK_DOWN))
-        .requiredIntegerParameter(Labels.withId(DELAY_ID), 0)
-        .requiredSingleValueSelection(Labels.withId(EVENT_SELECTION_ID),
-            Options.from(OPTION_FIRST, OPTION_LAST, OPTION_ALL))
-        .outputStrategy(OutputStrategies.keep())
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        SignalEdgeFilterProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge",
 0)
+            .category(DataProcessorType.BOOLEAN_OPERATOR, 
DataProcessorType.FILTER)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                .requiredPropertyWithUnaryMapping(EpRequirements.booleanReq(), 
Labels.withId(BOOLEAN_SIGNAL_FIELD),
+                    PropertyScope.NONE)
+                .build())
+            .requiredSingleValueSelection(Labels.withId(FLANK_ID), 
Options.from(BOTH, FLANK_UP, FLANK_DOWN))
+            .requiredIntegerParameter(Labels.withId(DELAY_ID), 0)
+            .requiredSingleValueSelection(Labels.withId(EVENT_SELECTION_ID),
+                Options.from(OPTION_FIRST, OPTION_LAST, OPTION_ALL))
+            .outputStrategy(OutputStrategies.keep())
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
     booleanSignalField = 
parameters.extractor().mappingPropertyValue(BOOLEAN_SIGNAL_FIELD);
     flank = parameters.extractor().selectedSingleValue(FLANK_ID, String.class);
     delay = parameters.extractor().singleValueParameter(DELAY_ID, 
Integer.class);
     eventSelection = 
parameters.extractor().selectedSingleValue(EVENT_SELECTION_ID, String.class);
-
     this.lastValue = false;
     this.delayCount = 0;
     this.resultEvents = new ArrayList<>();
@@ -99,39 +101,32 @@ public class SignalEdgeFilterProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event inputEvent,
-                      SpOutputCollector collector) throws SpRuntimeException {
+                      SpOutputCollector collector) {
     boolean value = 
inputEvent.getFieldBySelector(this.booleanSignalField).getAsPrimitive().getAsBoolean();
-
     // Detect edges in signal
     if (detectEdge(value, lastValue)) {
       this.edgeDetected = true;
       this.resultEvents = new ArrayList<>();
       this.delayCount = 0;
     }
-
     if (edgeDetected) {
       // Buffer event(s) according to user configuration
       addResultEvent(inputEvent);
-
       // Detect if the delay has been waited for
       if (this.delay == delayCount) {
         for (Event event : this.resultEvents) {
           collector.collect(event);
         }
-
         this.edgeDetected = false;
-
       } else {
         this.delayCount++;
       }
     }
-
     this.lastValue = value;
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 
   private boolean detectEdge(boolean value, boolean lastValue) {
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/inverter/BooleanInverterProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/inverter/BooleanInverterProcessor.java
index 6be14a9edc..fc2677b868 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/inverter/BooleanInverterProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/inverter/BooleanInverterProcessor.java
@@ -18,61 +18,62 @@
 
 package 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.inverter;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class BooleanInverterProcessor extends StreamPipesDataProcessor {
+public class BooleanInverterProcessor implements IStreamPipesDataProcessor {
   public static final String INVERT_FIELD_ID = "invert-field";
   private String invertFieldName;
+
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
 0)
-        .category(DataProcessorType.BOOLEAN_OPERATOR)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.booleanReq(),
-                Labels.withId(INVERT_FIELD_ID),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(OutputStrategies.keep())
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        BooleanInverterProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
 0)
+            .category(DataProcessorType.BOOLEAN_OPERATOR)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.booleanReq(),
+                    Labels.withId(INVERT_FIELD_ID),
+                    PropertyScope.NONE)
+                .build())
+            .outputStrategy(OutputStrategies.keep())
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    ProcessingElementParameterExtractor extractor = parameters.extractor();
-    this.invertFieldName = extractor.mappingPropertyValue(INVERT_FIELD_ID);
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
+    this.invertFieldName = 
parameters.extractor().mappingPropertyValue(INVERT_FIELD_ID);
   }
 
   @Override
-  public void onEvent(Event inputEvent, SpOutputCollector collector) throws 
SpRuntimeException {
+  public void onEvent(Event inputEvent, SpOutputCollector collector) {
     boolean field = 
inputEvent.getFieldBySelector(invertFieldName).getAsPrimitive().getAsBoolean();
     inputEvent.updateFieldBySelector(invertFieldName, !field);
     collector.collect(inputEvent);
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/logical/BooleanOperatorProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/logical/BooleanOperatorProcessor.java
index c367374a9e..a08ea8f201 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/logical/BooleanOperatorProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/logical/BooleanOperatorProcessor.java
@@ -19,11 +19,13 @@
 package 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.logical;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.logical.enums.BooleanOperatorType;
@@ -32,20 +34,19 @@ import 
org.apache.streampipes.processors.transformation.jvm.processor.booloperat
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.util.List;
 
 import static 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.logical.enums.BooleanOperatorType.NOT;
 
-public class BooleanOperatorProcessor extends StreamPipesDataProcessor {
+public class BooleanOperatorProcessor implements IStreamPipesDataProcessor {
 
   private static final String BOOLEAN_PROCESSOR_OUT_KEY = 
"boolean-operations-result";
   private static final String BOOLEAN_OPERATOR_TYPE = "operator-field";
@@ -53,38 +54,42 @@ public class BooleanOperatorProcessor extends 
StreamPipesDataProcessor {
   private BooleanOperationInputConfigs configs;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.logical",
 0)
-        .withAssets(ExtensionAssetType.DOCUMENTATION)
-        .withLocales(Locales.EN)
-        .category(DataProcessorType.BOOLEAN_OPERATOR)
-        .requiredStream(
-            StreamRequirementsBuilder
-                .create()
-                .requiredPropertyWithNaryMapping(EpRequirements.booleanReq(),
-                    Labels.withId(PROPERTIES_LIST), PropertyScope.NONE)
-                .build())
-        .requiredSingleValueSelection(Labels.withId(BOOLEAN_OPERATOR_TYPE), 
Options.from(
-            BooleanOperatorType.AND.operator(),
-            BooleanOperatorType.OR.operator(),
-            BooleanOperatorType.NOT.operator(),
-            BooleanOperatorType.XOR.operator(),
-            BooleanOperatorType.X_NOR.operator(),
-            BooleanOperatorType.NOR.operator()))
-        .outputStrategy(OutputStrategies.append(
-            PrimitivePropertyBuilder.create(
-                    Datatypes.Boolean, BOOLEAN_PROCESSOR_OUT_KEY)
-                .build())
-        )
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        BooleanOperatorProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.logical",
 0)
+            .withAssets(ExtensionAssetType.DOCUMENTATION)
+            .withLocales(Locales.EN)
+            .category(DataProcessorType.BOOLEAN_OPERATOR)
+            .requiredStream(
+                StreamRequirementsBuilder
+                    .create()
+                    
.requiredPropertyWithNaryMapping(EpRequirements.booleanReq(),
+                        Labels.withId(PROPERTIES_LIST), PropertyScope.NONE)
+                    .build())
+            
.requiredSingleValueSelection(Labels.withId(BOOLEAN_OPERATOR_TYPE), 
Options.from(
+                BooleanOperatorType.AND.operator(),
+                BooleanOperatorType.OR.operator(),
+                BooleanOperatorType.NOT.operator(),
+                BooleanOperatorType.XOR.operator(),
+                BooleanOperatorType.X_NOR.operator(),
+                BooleanOperatorType.NOR.operator()))
+            .outputStrategy(OutputStrategies.append(
+                PrimitivePropertyBuilder.create(
+                        Datatypes.Boolean, BOOLEAN_PROCESSOR_OUT_KEY)
+                    .build())
+            )
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams processorParams, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext 
eventProcessorRuntimeContext) throws SpRuntimeException {
-    List<String> properties = 
processorParams.extractor().mappingPropertyValues(PROPERTIES_LIST);
-    String operator = 
processorParams.extractor().selectedSingleValue(BOOLEAN_OPERATOR_TYPE, 
String.class);
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
+    List<String> properties = 
parameters.extractor().mappingPropertyValues(PROPERTIES_LIST);
+    String operator = 
parameters.extractor().selectedSingleValue(BOOLEAN_OPERATOR_TYPE, String.class);
     BooleanOperationInputConfigs configs =
         new BooleanOperationInputConfigs(properties, 
BooleanOperatorType.getBooleanOperatorType(operator));
     preChecks(configs);
@@ -92,7 +97,7 @@ public class BooleanOperatorProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onEvent(Event event, SpOutputCollector spOutputCollector) throws 
SpRuntimeException {
+  public void onEvent(Event event, SpOutputCollector spOutputCollector) {
     List<String> properties = configs.getProperties();
     BooleanOperatorType operatorType = configs.getOperator();
     Boolean firstProperty = 
event.getFieldBySelector(properties.get(0)).getAsPrimitive().getAsBoolean();
@@ -101,24 +106,21 @@ public class BooleanOperatorProcessor extends 
StreamPipesDataProcessor {
     if (properties.size() == 1) {
       // support for NOT operator
       result = boolOperation.evaluate(firstProperty, firstProperty);
-
     } else {
       Boolean secondProperty = 
event.getFieldBySelector(properties.get(1)).getAsPrimitive().getAsBoolean();
       result = boolOperation.evaluate(firstProperty, secondProperty);
-
       //loop through rest of the properties to get final result
       for (int i = 2; i < properties.size(); i++) {
         result =
             boolOperation.evaluate(result, 
event.getFieldBySelector(properties.get(i)).getAsPrimitive().getAsBoolean());
       }
-
     }
     event.addField(BOOLEAN_PROCESSOR_OUT_KEY, result);
     spOutputCollector.collect(event);
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     configs = null;
   }
 
@@ -132,3 +134,4 @@ public class BooleanOperatorProcessor extends 
StreamPipesDataProcessor {
     }
   }
 }
+
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateProcessor.java
index 2736cc7864..81f5f5e1a8 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/state/BooleanToStateProcessor.java
@@ -19,15 +19,18 @@
 package 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.CodeLanguage;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
@@ -36,8 +39,6 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.vocabulary.SPSensor;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import org.slf4j.Logger;
@@ -46,7 +47,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map;
 
-public class BooleanToStateProcessor extends StreamPipesDataProcessor {
+public class BooleanToStateProcessor implements IStreamPipesDataProcessor {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BooleanToStateProcessor.class);
 
@@ -67,38 +68,39 @@ public class BooleanToStateProcessor extends 
StreamPipesDataProcessor {
       + "}";
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state",
 0)
-        .category(DataProcessorType.BOOLEAN_OPERATOR)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithNaryMapping(EpRequirements.booleanReq(), 
Labels.withId(BOOLEAN_STATE_FIELD),
-                PropertyScope.NONE)
-            .build())
-        .requiredTextParameter(Labels.withId(DEFAULT_STATE_ID))
-        .requiredCodeblock(Labels.withId(JSON_CONFIGURATION), 
CodeLanguage.Javascript, defaultSkeleton)
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.stringEp(Labels.withId(CURRENT_STATE), CURRENT_STATE, 
SPSensor.STATE)
-        ))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        BooleanToStateProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state",
 0)
+            .category(DataProcessorType.BOOLEAN_OPERATOR)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                .requiredPropertyWithNaryMapping(EpRequirements.booleanReq(), 
Labels.withId(BOOLEAN_STATE_FIELD),
+                    PropertyScope.NONE)
+                .build())
+            .requiredTextParameter(Labels.withId(DEFAULT_STATE_ID))
+            .requiredCodeblock(Labels.withId(JSON_CONFIGURATION), 
CodeLanguage.Javascript, defaultSkeleton)
+            .outputStrategy(OutputStrategies.append(
+                EpProperties.stringEp(Labels.withId(CURRENT_STATE), 
CURRENT_STATE, SPSensor.STATE)
+            ))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     stateFields = extractor.mappingPropertyValues(BOOLEAN_STATE_FIELD);
     defaultState = extractor.singleValueParameter(DEFAULT_STATE_ID, 
String.class);
     String jsonConfigurationString = 
extractor.codeblockValue(JSON_CONFIGURATION);
-
     try {
       jsonConfigurationString = 
jsonConfigurationString.replaceAll("(?m)^//.*", "");
       jsonConfiguration =
           
JacksonSerializer.getObjectMapper().readValue(jsonConfigurationString, 
Map.class);
-
     } catch (JsonProcessingException e) {
       LOG.info("Error when parsing the json configuration: " + 
jsonConfigurationString);
       throw new SpRuntimeException("The following mapping configuration is not 
valid: " + jsonConfigurationString);
@@ -107,26 +109,22 @@ public class BooleanToStateProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event,
-                      SpOutputCollector collector) throws SpRuntimeException {
+                      SpOutputCollector collector) {
     String state = this.defaultState;
-
     for (String stateField : stateFields) {
       if 
(event.getFieldBySelector(stateField).getAsPrimitive().getAsBoolean().equals(true))
 {
         state = event.getFieldBySelector(stateField).getFieldNameIn();
       }
     }
-
-    // replace the state string when user provided a mapping
     if (this.jsonConfiguration.containsKey(state)) {
       state = this.jsonConfiguration.get(state);
     }
-
     event.addField(BooleanToStateProcessor.CURRENT_STATE, state);
     collector.collect(event);
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
+
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timekeeping/BooleanTimekeepingProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timekeeping/BooleanTimekeepingProcessor.java
index 71be83b2a9..97e1f26ed7 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timekeeping/BooleanTimekeepingProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timekeeping/BooleanTimekeepingProcessor.java
@@ -18,28 +18,28 @@
 
 package 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timekeeping;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.util.LinkedList;
 
-public class BooleanTimekeepingProcessor extends StreamPipesDataProcessor {
+public class BooleanTimekeepingProcessor implements IStreamPipesDataProcessor {
   // Measures time and returns count
 
   public static final String LEFT_FIELD_ID = "left-field";
@@ -65,42 +65,44 @@ public class BooleanTimekeepingProcessor extends 
StreamPipesDataProcessor {
   private double outputDivisor;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.timekeeping",
 0)
-        .category(DataProcessorType.BOOLEAN_OPERATOR, DataProcessorType.TIME)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON, 
"time_measure_example.png")
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.booleanReq(),
-                Labels.withId(LEFT_FIELD_ID),
-                PropertyScope.NONE)
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.booleanReq(),
-                Labels.withId(RIGHT_FIELD_ID),
-                PropertyScope.NONE)
-            .build())
-        .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID), 
Options.from(MILLISECONDS, SECONDS, MINUTES))
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.numberEp(Labels.withId(TIME_FIELD_ID),
-                "measured_time",
-                "http://schema.org/Number";),
-            EpProperties.numberEp(Labels.withId(COUNT_FIELD_ID),
-                "counter",
-                "http://schema.org/Number";)
-
-        ))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        BooleanTimekeepingProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.timekeeping",
 0)
+            .category(DataProcessorType.BOOLEAN_OPERATOR, 
DataProcessorType.TIME)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON, "time_measure_example.png")
+            .requiredStream(StreamRequirementsBuilder.create()
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.booleanReq(),
+                    Labels.withId(LEFT_FIELD_ID),
+                    PropertyScope.NONE)
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.booleanReq(),
+                    Labels.withId(RIGHT_FIELD_ID),
+                    PropertyScope.NONE)
+                .build())
+            .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID), 
Options.from(MILLISECONDS, SECONDS, MINUTES))
+            .outputStrategy(OutputStrategies.append(
+                EpProperties.numberEp(Labels.withId(TIME_FIELD_ID),
+                    "measured_time",
+                    "http://schema.org/Number";),
+                EpProperties.numberEp(Labels.withId(COUNT_FIELD_ID),
+                    "counter",
+                    "http://schema.org/Number";)
+            ))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     leftFieldName = extractor.mappingPropertyValue(LEFT_FIELD_ID);
-    rightFieldName = extractor.mappingPropertyValue(LEFT_FIELD_ID);
+    rightFieldName = extractor.mappingPropertyValue(RIGHT_FIELD_ID);
     String outputUnit = extractor.selectedSingleValue(OUTPUT_UNIT_ID, 
String.class);
     leftFieldLast = false;
     rightFieldLast = false;
@@ -116,40 +118,30 @@ public class BooleanTimekeepingProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onEvent(Event inputEvent, SpOutputCollector collector) throws 
SpRuntimeException {
+  public void onEvent(Event inputEvent, SpOutputCollector collector) {
     boolean leftField = 
inputEvent.getFieldBySelector(leftFieldName).getAsPrimitive().getAsBoolean();
     boolean rightField = 
inputEvent.getFieldBySelector(rightFieldName).getAsPrimitive().getAsBoolean();
 
-
     if (!rightFieldLast && rightField) {
       if (this.allPending.size() > 0) {
         Long startTime = this.allPending.removeLast();
-
         Long timeDifference = System.currentTimeMillis() - startTime;
-
         double result = timeDifference / this.outputDivisor;
-
         this.counter++;
-
         if (this.counter == Long.MAX_VALUE) {
           this.counter = 0L;
         }
-
         inputEvent.addField("measured_time", result);
         inputEvent.addField("counter", this.counter);
-
         collector.collect(inputEvent);
       }
     }
-
-
     if (!leftFieldLast && leftField) {
       this.allPending.push(System.currentTimeMillis());
     }
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timer/BooleanTimerProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timer/BooleanTimerProcessor.java
index 26eb03588f..430fe41bb8 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timer/BooleanTimerProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/timer/BooleanTimerProcessor.java
@@ -18,26 +18,26 @@
 
 package 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class BooleanTimerProcessor extends StreamPipesDataProcessor {
+public class BooleanTimerProcessor implements IStreamPipesDataProcessor {
 
   public static final String FIELD_ID = "field";
   public static final String TIMER_FIELD_ID = "timerField";
@@ -58,47 +58,45 @@ public class BooleanTimerProcessor extends 
StreamPipesDataProcessor {
 
   private double outputDivisor;
 
-
-
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.timer",
 0)
-        .category(DataProcessorType.BOOLEAN_OPERATOR, DataProcessorType.TIME)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.booleanReq(),
-                Labels.withId(FIELD_ID),
-                PropertyScope.NONE)
-            .build())
-        .requiredSingleValueSelection(Labels.withId(TIMER_FIELD_ID), 
Options.from(TRUE, FALSE))
-        .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID), 
Options.from(MILLISECONDS, SECONDS, MINUTES))
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.numberEp(Labels.withId(MEASURED_TIME_ID),
-                "measured_time",
-                "http://schema.org/Number";)
-        ))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        BooleanTimerProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.booloperator.timer",
 0)
+            .category(DataProcessorType.BOOLEAN_OPERATOR, 
DataProcessorType.TIME)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.booleanReq(),
+                    Labels.withId(FIELD_ID),
+                    PropertyScope.NONE)
+                .build())
+            .requiredSingleValueSelection(Labels.withId(TIMER_FIELD_ID), 
Options.from(TRUE, FALSE))
+            .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID), 
Options.from(MILLISECONDS, SECONDS, MINUTES))
+            .outputStrategy(OutputStrategies.append(
+                EpProperties.numberEp(Labels.withId(MEASURED_TIME_ID),
+                    "measured_time",
+                    "http://schema.org/Number";)
+            ))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     fieldName = extractor.mappingPropertyValue(FIELD_ID);
     String measureTrueString = extractor.selectedSingleValue(TIMER_FIELD_ID, 
String.class);
     String outputUnit = extractor.selectedSingleValue(OUTPUT_UNIT_ID, 
String.class);
-
     measureTrue = false;
     timestamp = Long.MIN_VALUE;
     if (measureTrueString.equals(TRUE)) {
       measureTrue = true;
     }
-
     outputDivisor = 1.0;
     if (outputUnit.equals(SECONDS)) {
       outputDivisor = 1000.0;
@@ -109,9 +107,8 @@ public class BooleanTimerProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event inputEvent,
-                      SpOutputCollector collector) throws SpRuntimeException {
+                      SpOutputCollector collector) {
     boolean field = 
inputEvent.getFieldBySelector(this.fieldName).getAsPrimitive().getAsBoolean();
-
     if (this.measureTrue == field) {
       if (timestamp == Long.MIN_VALUE) {
         timestamp = System.currentTimeMillis();
@@ -119,9 +116,7 @@ public class BooleanTimerProcessor extends 
StreamPipesDataProcessor {
     } else {
       if (timestamp != Long.MIN_VALUE) {
         Long difference = System.currentTimeMillis() - timestamp;
-
         double result = difference / this.outputDivisor;
-
         inputEvent.addField("measured_time", result);
         timestamp = Long.MIN_VALUE;
         collector.collect(inputEvent);
@@ -130,7 +125,6 @@ public class BooleanTimerProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java
index d4ea565869..61ff8c374b 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/csvmetadata/CsvMetadataEnrichmentProcessor.java
@@ -27,14 +27,16 @@ import 
org.apache.streampipes.commons.parser.PrimitiveTypeParser;
 import org.apache.streampipes.commons.parser.StringParser;
 import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import 
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
 import 
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
 import 
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.EventProperty;
@@ -43,6 +45,7 @@ import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.model.staticproperty.Option;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Filetypes;
@@ -51,8 +54,6 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.helpers.Tuple2;
 import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
@@ -68,12 +69,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentUtils.getCsvParser;
+import static org.apache.streampipes.processors.transformation.jvm.processor
+    .csvmetadata.CsvMetadataEnrichmentUtils.getCsvParser;
 
 public class CsvMetadataEnrichmentProcessor
-    extends StreamPipesDataProcessor
-    implements ResolvesContainerProvidedOptions,
-    ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, 
ProcessingElementParameterExtractor> {
+    implements IStreamPipesDataProcessor,
+               ResolvesContainerProvidedOptions,
+               
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, 
ProcessingElementParameterExtractor> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CsvMetadataEnrichmentProcessor.class);
 
@@ -88,30 +90,40 @@ public class CsvMetadataEnrichmentProcessor
   private Map<String, CSVRecord> columnMap;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.csvmetadata", 0)
-        .category(DataProcessorType.ENRICH)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.anyProperty(),
-                Labels.withId(MAPPING_FIELD_KEY),
-                PropertyScope.NONE)
-            .build())
-        .requiredFile(Labels.withId(CSV_FILE_KEY), Filetypes.CSV)
-        
.requiredSingleValueSelectionFromContainer(Labels.withId(FIELD_TO_MATCH),
-            Arrays.asList(MAPPING_FIELD_KEY, CSV_FILE_KEY))
-        
.requiredMultiValueSelectionFromContainer(Labels.withId(FIELDS_TO_APPEND_KEY),
-            Arrays.asList(MAPPING_FIELD_KEY, CSV_FILE_KEY, FIELD_TO_MATCH))
-        .outputStrategy(OutputStrategies.customTransformation())
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        CsvMetadataEnrichmentProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.csvmetadata", 0)
+            .category(DataProcessorType.ENRICH)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                                                     
.requiredPropertyWithUnaryMapping(
+                                                         
EpRequirements.anyProperty(),
+                                                         
Labels.withId(MAPPING_FIELD_KEY),
+                                                         PropertyScope.NONE
+                                                     )
+                                                     .build())
+            .requiredFile(Labels.withId(CSV_FILE_KEY), Filetypes.CSV)
+            .requiredSingleValueSelectionFromContainer(
+                Labels.withId(FIELD_TO_MATCH),
+                Arrays.asList(MAPPING_FIELD_KEY, CSV_FILE_KEY)
+            )
+            .requiredMultiValueSelectionFromContainer(
+                Labels.withId(FIELDS_TO_APPEND_KEY),
+                Arrays.asList(MAPPING_FIELD_KEY, CSV_FILE_KEY, FIELD_TO_MATCH)
+            )
+            .outputStrategy(OutputStrategies.customTransformation())
+            .build()
+    );
   }
 
   @Override
-  public List<Option> resolveOptions(String requestId,
-                                     IStaticPropertyExtractor 
parameterExtractor) {
+  public List<Option> resolveOptions(
+      String requestId,
+      IStaticPropertyExtractor parameterExtractor
+  ) {
     try {
       String fileContents = getFileContents(parameterExtractor);
       if (requestId.equals(FIELDS_TO_APPEND_KEY)) {
@@ -127,8 +139,10 @@ public class CsvMetadataEnrichmentProcessor
   }
 
   @Override
-  public EventSchema resolveOutputStrategy(DataProcessorInvocation 
processingElement,
-                                           ProcessingElementParameterExtractor 
parameterExtractor)
+  public EventSchema resolveOutputStrategy(
+      DataProcessorInvocation processingElement,
+      ProcessingElementParameterExtractor parameterExtractor
+  )
       throws SpRuntimeException {
     List<EventProperty> properties = processingElement
         .getInputStreams()
@@ -136,8 +150,10 @@ public class CsvMetadataEnrichmentProcessor
         .getEventSchema()
         .getEventProperties();
 
-    List<String> columnsToInclude = 
parameterExtractor.selectedMultiValues(FIELDS_TO_APPEND_KEY,
-        String.class);
+    List<String> columnsToInclude = parameterExtractor.selectedMultiValues(
+        FIELDS_TO_APPEND_KEY,
+        String.class
+    );
     try {
       String fileContents = getFileContents(parameterExtractor);
       properties.addAll(getAppendProperties(fileContents, columnsToInclude));
@@ -148,8 +164,10 @@ public class CsvMetadataEnrichmentProcessor
     return new EventSchema(properties);
   }
 
-  private List<EventProperty> getAppendProperties(String fileContents,
-                                                  List<String> 
columnsToInclude) throws IOException {
+  private List<EventProperty> getAppendProperties(
+      String fileContents,
+      List<String> columnsToInclude
+  ) throws IOException {
     CSVParser parser = getCsvParser(fileContents);
     List<EventProperty> propertiesToAppend = new ArrayList<>();
     List<CSVRecord> records = parser.getRecords();
@@ -166,8 +184,10 @@ public class CsvMetadataEnrichmentProcessor
     return CsvMetadataEnrichmentUtils.getGuessedEventProperty(column, 
firstRecord);
   }
 
-  private List<Option> getOptionsFromColumnNames(String fileContents,
-                                                 List<String> columnsToIgnore) 
throws IOException {
+  private List<Option> getOptionsFromColumnNames(
+      String fileContents,
+      List<String> columnsToIgnore
+  ) throws IOException {
     return getColumnNames(fileContents, columnsToIgnore)
         .stream()
         .map(Option::new)
@@ -180,13 +200,15 @@ public class CsvMetadataEnrichmentProcessor
         .getHeaderMap()
         .keySet()
         .stream()
-        .filter(key -> columnsToIgnore.stream().noneMatch(c -> c.equals(key)))
+        .filter(key -> columnsToIgnore.stream()
+                                      .noneMatch(c -> c.equals(key)))
         .collect(Collectors.toList());
   }
 
   private String getFileContents(IParameterExtractor extractor) {
     String filename = extractor.selectedFilename(CSV_FILE_KEY);
-    return 
getStreamPipesClientInstance().fileApi().getFileContentAsString(filename);
+    return getStreamPipesClientInstance().fileApi()
+                                         .getFileContentAsString(filename);
   }
 
   private StreamPipesClient getStreamPipesClientInstance() {
@@ -194,15 +216,16 @@ public class CsvMetadataEnrichmentProcessor
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataProcessorParameters parameters,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) {
     var extractor = parameters.extractor();
     mappingFieldSelector = extractor.mappingPropertyValue(MAPPING_FIELD_KEY);
     List<String> fieldsToAppend = 
extractor.selectedMultiValues(FIELDS_TO_APPEND_KEY, String.class);
     matchingColumn = extractor.selectedSingleValue(FIELD_TO_MATCH, 
String.class);
     String fileContents = getFileContents(extractor);
-
     try {
       makeColumnMap(fileContents);
     } catch (IOException e) {
@@ -211,7 +234,14 @@ public class CsvMetadataEnrichmentProcessor
     if (!this.columnMap.isEmpty()) {
       this.columnsToAppend = fieldsToAppend
           .stream()
-          .map(c -> makeParser(c, 
this.columnMap.entrySet().stream().findFirst().get().getValue()))
+          .map(c -> makeParser(
+              c,
+              this.columnMap.entrySet()
+                            .stream()
+                            .findFirst()
+                            .get()
+                            .getValue()
+          ))
           .collect(Collectors.toList());
     } else {
       LOG.warn("Could not find any rows, does the CSV file contain data?");
@@ -220,9 +250,11 @@ public class CsvMetadataEnrichmentProcessor
   }
 
   @Override
-  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+  public void onEvent(Event event, SpOutputCollector collector) {
     String lookupValue =
-        
event.getFieldBySelector(mappingFieldSelector).getAsPrimitive().getAsString();
+        event.getFieldBySelector(mappingFieldSelector)
+             .getAsPrimitive()
+             .getAsString();
     CSVRecord record = this.columnMap.get(lookupValue);
     for (Tuple2<String, PrimitiveTypeParser> columnToAppend : columnsToAppend) 
{
       event.addField(columnToAppend.k, getRecordValueOrDefault(record, 
columnToAppend));
@@ -231,12 +263,14 @@ public class CsvMetadataEnrichmentProcessor
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     this.columnMap = new HashMap<>();
   }
 
-  private Object getRecordValueOrDefault(CSVRecord record, Tuple2<String,
-      PrimitiveTypeParser> columnToAppend) {
+  private Object getRecordValueOrDefault(
+      CSVRecord record, Tuple2<String,
+          PrimitiveTypeParser> columnToAppend
+  ) {
     if (record != null) {
       return columnToAppend.v.parse(record.get(columnToAppend.k));
     } else {
@@ -244,8 +278,10 @@ public class CsvMetadataEnrichmentProcessor
     }
   }
 
-  private Tuple2<String, PrimitiveTypeParser> makeParser(String columnName,
-                                                         CSVRecord record) {
+  private Tuple2<String, PrimitiveTypeParser> makeParser(
+      String columnName,
+      CSVRecord record
+  ) {
     Datatypes columnDatatype = 
CsvMetadataEnrichmentUtils.getGuessDatatype(columnName, record);
     if (columnDatatype.equals(Datatypes.Float)) {
       return new Tuple2<>(columnName, new FloatParser());
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java
index 7c4e2801aa..5e6bc253a0 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java
@@ -18,17 +18,18 @@
 
 package 
org.apache.streampipes.processors.transformation.jvm.processor.datetime;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -36,8 +37,6 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -48,7 +47,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class DateTimeFromStringProcessor extends StreamPipesDataProcessor {
+public class DateTimeFromStringProcessor implements IStreamPipesDataProcessor {
 
   public static final String FIELD_ID = "inputField";
   public static final String OUTPUT_TIMESTAMP_RUNTIME_NAME = 
"timestringInMillis";
@@ -59,42 +58,42 @@ public class DateTimeFromStringProcessor extends 
StreamPipesDataProcessor {
   private String selectedTimeZone;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.datetime", 0)
-        .category(DataProcessorType.STRING_OPERATOR, DataProcessorType.TIME)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-                                                 
.requiredPropertyWithUnaryMapping(
-                                                     
EpRequirements.stringReq(),
-                                                     Labels.withId(FIELD_ID),
-                                                     PropertyScope.NONE
-                                                 )
-                                                 .build())
-        .requiredSingleValueSelection(Labels.withId(INPUT_TIMEZONE_KEY),
-                                      Options.from(getTimeZoneOptions()), true
-        )
-        .outputStrategy(
-            OutputStrategies.append(
-                EpProperties.timestampProperty(OUTPUT_TIMESTAMP_RUNTIME_NAME),
-                EpProperties.stringEp(
-                    // We can use the labels from the input timezone here
-                    Labels.withId(INPUT_TIMEZONE_KEY),
-                    OUTPUT_TIMEZONE_RUNTIME_NAME,
-                    SO.SCHEDULE_TIMEZONE
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        DateTimeFromStringProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.datetime", 0)
+            .category(DataProcessorType.STRING_OPERATOR, 
DataProcessorType.TIME)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.stringReq(),
+                    Labels.withId(FIELD_ID),
+                    PropertyScope.NONE
+                )
+                .build())
+            .requiredSingleValueSelection(Labels.withId(INPUT_TIMEZONE_KEY),
+                Options.from(getTimeZoneOptions()), true)
+            .outputStrategy(
+                OutputStrategies.append(
+                    
EpProperties.timestampProperty(OUTPUT_TIMESTAMP_RUNTIME_NAME),
+                    EpProperties.stringEp(
+                        Labels.withId(INPUT_TIMEZONE_KEY),
+                        OUTPUT_TIMEZONE_RUNTIME_NAME,
+                        SO.SCHEDULE_TIMEZONE
+                    )
                 )
             )
-        )
-        .build();
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(
-      ProcessorParams parameters, SpOutputCollector spOutputCollector,
-      EventProcessorRuntimeContext runtimeContext
-  ) throws SpRuntimeException {
-    ProcessingElementParameterExtractor extractor = parameters.extractor();
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
+    var extractor = parameters.extractor();
     this.streamInputDateTimeFieldName = 
extractor.mappingPropertyValue(FIELD_ID);
     this.selectedTimeZone = extractor.selectedSingleValue(INPUT_TIMEZONE_KEY, 
String.class);
   }
@@ -106,7 +105,6 @@ public class DateTimeFromStringProcessor extends 
StreamPipesDataProcessor {
                                  .getAsString();
     DateTimeFormatter dtFormatter = DateTimeFormatter.ISO_DATE_TIME;
     ZonedDateTime zdt = parseDateTime(dateTimeString, dtFormatter);
-
     /*
      * A temporary workaround is in place to put a long represent the
      * zonedDateTimeVariable One possible workaround is to use the time zone 
and the
@@ -119,13 +117,11 @@ public class DateTimeFromStringProcessor extends 
StreamPipesDataProcessor {
            .toEpochMilli()
     );
     event.addField(OUTPUT_TIMEZONE_RUNTIME_NAME, selectedTimeZone);
-
     collector.collect(event);
   }
 
   @Override
-  public void onDetach() {
-
+  public void onPipelineStopped() {
   }
 
   private ZonedDateTime parseDateTime(String dateTimeString, DateTimeFormatter 
dtf) {
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/measurementconverter/MeasurementUnitConverterProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/measurementconverter/MeasurementUnitConverterProcessor.java
index b61742f89b..bea2703d43 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/measurementconverter/MeasurementUnitConverterProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/measurementconverter/MeasurementUnitConverterProcessor.java
@@ -21,11 +21,13 @@ package 
org.apache.streampipes.processors.transformation.jvm.processor.measureme
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import 
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
@@ -35,13 +37,12 @@ import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableOneOfStaticP
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.PropertyRequirementsBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.helpers.TransformOperations;
 import org.apache.streampipes.units.UnitProvider;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import com.github.jqudt.Quantity;
 import com.github.jqudt.Unit;
@@ -51,8 +52,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class MeasurementUnitConverterProcessor extends StreamPipesDataProcessor
-    implements ResolvesContainerProvidedOptions {
+public class MeasurementUnitConverterProcessor implements 
IStreamPipesDataProcessor,
+    ResolvesContainerProvidedOptions {
 
   private static final String CONVERT_PROPERTY = "convert-property";
   private static final String OUTPUT_UNIT = "output-unit";
@@ -63,35 +64,39 @@ public class MeasurementUnitConverterProcessor extends 
StreamPipesDataProcessor
   private String convertProperty;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.measurementunitconverter",
 0)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(PropertyRequirementsBuilder
-                    .create()
-                    .measurementUnitPresence()
-                    .build(),
-                Labels.withId(CONVERT_PROPERTY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .build())
-        .requiredSingleValueSelectionFromContainer(Labels.withId(OUTPUT_UNIT))
-        .outputStrategy(OutputStrategies.transform(TransformOperations
-            .dynamicMeasurementUnitTransformation(CONVERT_PROPERTY, 
OUTPUT_UNIT)))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        MeasurementUnitConverterProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.measurementunitconverter",
 0)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredPropertyWithUnaryMapping(PropertyRequirementsBuilder
+                        .create()
+                        .measurementUnitPresence()
+                        .build(),
+                    Labels.withId(CONVERT_PROPERTY),
+                    PropertyScope.MEASUREMENT_PROPERTY)
+                .build())
+            
.requiredSingleValueSelectionFromContainer(Labels.withId(OUTPUT_UNIT))
+            .outputStrategy(OutputStrategies.transform(TransformOperations
+                .dynamicMeasurementUnitTransformation(CONVERT_PROPERTY, 
OUTPUT_UNIT)))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
 
     this.convertProperty = extractor.mappingPropertyValue(CONVERT_PROPERTY);
     String runtimeName = 
extractor.getEventPropertyBySelector(this.convertProperty).getRuntimeName();
     String inputUnitId = extractor.measurementUnit(runtimeName, 0);
-    String outputUnitId = 
parameters.getGraph().getStaticProperties().stream().filter(sp -> sp
+    String outputUnitId = 
parameters.getModel().getStaticProperties().stream().filter(sp -> sp
             .getInternalName().equals(OUTPUT_UNIT))
         .map(sp ->
             (RuntimeResolvableOneOfStaticProperty) sp)
@@ -105,7 +110,7 @@ public class MeasurementUnitConverterProcessor extends 
StreamPipesDataProcessor
   }
 
   @Override
-  public void onEvent(Event in, SpOutputCollector out) throws 
SpRuntimeException {
+  public void onEvent(Event in, SpOutputCollector out) {
     double value = 
in.getFieldBySelector(convertProperty).getAsPrimitive().getAsDouble();
 
     // transform old value to new unit
@@ -120,8 +125,7 @@ public class MeasurementUnitConverterProcessor extends 
StreamPipesDataProcessor
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 
   @Override
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/round/RoundProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/round/RoundProcessor.java
index 51fc61494a..9885494637 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/round/RoundProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/round/RoundProcessor.java
@@ -18,23 +18,23 @@
 
 package org.apache.streampipes.processors.transformation.jvm.processor.round;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
@@ -42,7 +42,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class RoundProcessor extends StreamPipesDataProcessor {
+public class RoundProcessor implements IStreamPipesDataProcessor {
   private List<String> fieldsToBeRounded;
   private int numDigits;
   private String roundingMode;
@@ -63,33 +63,37 @@ public class RoundProcessor extends 
StreamPipesDataProcessor {
   };
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        .create("org.apache.streampipes.processors.transformation.jvm.round", 
0)
-        .category(DataProcessorType.TRANSFORM)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithNaryMapping(EpRequirements.numberReq(), 
Labels.withId(FIELDS), PropertyScope.NONE)
-            .build())
-        .requiredIntegerParameter(Labels.withId(NUM_DIGITS))
-        .requiredSingleValueSelection(Labels.withId(ROUNDING_MODE),
-            Options.from(ROUNDING_MODE_MAP.keySet().toArray(new String[0])))
-        .outputStrategy(OutputStrategies.keep())
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        RoundProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.round", 0)
+            .category(DataProcessorType.TRANSFORM)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredPropertyWithNaryMapping(EpRequirements.numberReq(), 
Labels.withId(FIELDS), PropertyScope.NONE)
+                .build())
+            .requiredIntegerParameter(Labels.withId(NUM_DIGITS))
+            .requiredSingleValueSelection(Labels.withId(ROUNDING_MODE),
+                Options.from(ROUNDING_MODE_MAP.keySet().toArray(new 
String[0])))
+            .outputStrategy(OutputStrategies.keep())
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
     fieldsToBeRounded = parameters.extractor().mappingPropertyValues(FIELDS);
     numDigits = parameters.extractor().singleValueParameter(NUM_DIGITS, 
Integer.class);
     roundingMode = parameters.extractor().selectedSingleValue(ROUNDING_MODE, 
String.class);
   }
 
   @Override
-  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+  public void onEvent(Event event, SpOutputCollector collector) {
     for (String fieldToBeRounded : fieldsToBeRounded) {
       double value = 
event.getFieldBySelector(fieldToBeRounded).getAsPrimitive().getAsDouble();
       double roundedValue =
@@ -100,6 +104,7 @@ public class RoundProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() {
+  public void onPipelineStopped() {
   }
-}
\ No newline at end of file
+}
+
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/StateUtils.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/StateUtils.java
index 1233aed153..744b9828e2 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/StateUtils.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/StateUtils.java
@@ -18,11 +18,11 @@
 
 package org.apache.streampipes.processors.transformation.jvm.processor.state;
 
+import 
org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor;
 import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
 import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
 import org.apache.streampipes.model.staticproperty.Option;
 import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -36,15 +36,15 @@ public class StateUtils {
   public static final String LABEL_STRING_ID = "labelStringId";
 
 
-  public static String getLabelName(ProcessingElementParameterExtractor 
extractor) {
+  public static String getLabelName(IDataProcessorParameterExtractor 
extractor) {
     return extractor.textParameter(LABEL_NAME);
   }
 
-  public static List<StaticPropertyGroup> 
getGroupItems(ProcessingElementParameterExtractor extractor) {
+  public static List<StaticPropertyGroup> 
getGroupItems(IDataProcessorParameterExtractor extractor) {
     return extractor.collectionMembersAsGroup(LABEL_COLLECTION_ID);
   }
 
-  public static List<Double> 
getNumberValues(ProcessingElementParameterExtractor extractor) {
+  public static List<Double> getNumberValues(IDataProcessorParameterExtractor 
extractor) {
     return getGroupItems(extractor)
         .stream()
         .map(group -> (
@@ -56,7 +56,7 @@ public class StateUtils {
         .collect(Collectors.toList());
   }
 
-  public static List<String> 
getLabelStrings(ProcessingElementParameterExtractor extractor) {
+  public static List<String> getLabelStrings(IDataProcessorParameterExtractor 
extractor) {
     return getGroupItems(extractor)
         .stream()
         .map(group -> (extractor
@@ -66,7 +66,7 @@ public class StateUtils {
         .collect(Collectors.toList());
   }
 
-  public static List<String> 
getComparators(ProcessingElementParameterExtractor extractor) {
+  public static List<String> getComparators(IDataProcessorParameterExtractor 
extractor) {
     return getGroupItems(extractor)
         .stream()
         .map(group -> (extractor
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferProcessor.java
index 734ff69d09..f706339422 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/buffer/StateBufferProcessor.java
@@ -18,15 +18,17 @@
 
 package 
org.apache.streampipes.processors.transformation.jvm.processor.state.buffer;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -34,16 +36,13 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.vocabulary.SPSensor;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class StateBufferProcessor extends StreamPipesDataProcessor {
+public class StateBufferProcessor implements IStreamPipesDataProcessor {
 
   public static final String TIMESTAMP_FIELD_ID = "timestampId";
   public static final String STATE_FIELD_ID = "stateId";
@@ -59,40 +58,42 @@ public class StateBufferProcessor extends 
StreamPipesDataProcessor {
 
   private Map<String, List> stateBuffer;
 
-
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.processor.state.buffer",
 0)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.timestampReq(),
-                Labels.withId(TIMESTAMP_FIELD_ID),
-                PropertyScope.HEADER_PROPERTY)
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.semanticTypeReqList(SPSensor.STATE),
-                Labels.withId(STATE_FIELD_ID),
-                PropertyScope.NONE)
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.numberReq(),
-                Labels.withId(SENSOR_VALUE_FIELD_ID),
-                PropertyScope.MEASUREMENT_PROPERTY)
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        StateBufferProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.processor.state.buffer",
 0)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.timestampReq(),
+                    Labels.withId(TIMESTAMP_FIELD_ID),
+                    PropertyScope.HEADER_PROPERTY)
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.semanticTypeReqList(SPSensor.STATE),
+                    Labels.withId(STATE_FIELD_ID),
+                    PropertyScope.NONE)
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.numberReq(),
+                    Labels.withId(SENSOR_VALUE_FIELD_ID),
+                    PropertyScope.MEASUREMENT_PROPERTY)
+                .build()
+            )
+            .outputStrategy(OutputStrategies.fixed(
+                EpProperties.timestampProperty(TIMESTAMP),
+                EpProperties.listDoubleEp(Labels.withId(VALUES), VALUES, 
SO.NUMBER),
+                EpProperties.listStringEp(Labels.withId(STATE), STATE, 
SPSensor.STATE)
+            ))
             .build()
-        )
-        .outputStrategy(OutputStrategies.fixed(
-            EpProperties.timestampProperty(TIMESTAMP),
-            EpProperties.listDoubleEp(Labels.withId(VALUES), VALUES, 
SO.NUMBER),
-            EpProperties.listStringEp(Labels.withId(STATE), STATE, 
SPSensor.STATE)
-        ))
-        .build();
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                                SpOutputCollector spOutputCollector,
+                                EventProcessorRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     timeProperty = extractor.mappingPropertyValue(TIMESTAMP_FIELD_ID);
     stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
@@ -101,21 +102,14 @@ public class StateBufferProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onEvent(Event inputEvent,
-                      SpOutputCollector collector) throws SpRuntimeException {
+  public void onEvent(Event inputEvent, SpOutputCollector collector) {
     long timestamp = 
inputEvent.getFieldBySelector(this.timeProperty).getAsPrimitive().getAsLong();
     List<String> states = 
inputEvent.getFieldBySelector(this.stateProperty).getAsList().parseAsSimpleType(String.class);
     double value = 
inputEvent.getFieldBySelector(this.sensorValueProperty).getAsPrimitive().getAsDouble();
 
     // add value to state buffer
     for (String state : states) {
-      if (stateBuffer.containsKey(state)) {
-        stateBuffer.get(state).add(value);
-      } else {
-        List tmp = new ArrayList();
-        tmp.add(value);
-        stateBuffer.put(state, tmp);
-      }
+      stateBuffer.computeIfAbsent(state, k -> new ArrayList<>()).add(value);
     }
 
     // emit event if state is not in event anymore
@@ -124,20 +118,18 @@ public class StateBufferProcessor extends 
StreamPipesDataProcessor {
       if (!states.contains(key)) {
         Event resultEvent = new Event();
         resultEvent.addField(StateBufferProcessor.VALUES, 
stateBuffer.get(key));
-        resultEvent.addField(StateBufferProcessor.STATE, Arrays.asList(key));
+        resultEvent.addField(StateBufferProcessor.STATE, List.of(key));
         resultEvent.addField(StateBufferProcessor.TIMESTAMP, timestamp);
         collector.collect(resultEvent);
         keysToRemove.add(key);
       }
     }
-
     for (String s : keysToRemove) {
       stateBuffer.remove(s);
     }
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerProcessor.java
index 8a09b578ac..08b2601dbb 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerProcessor.java
@@ -18,13 +18,14 @@
 
 package 
org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import 
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.EventSchema;
@@ -35,6 +36,7 @@ import 
org.apache.streampipes.processors.transformation.jvm.processor.state.labe
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
@@ -43,8 +45,6 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SPSensor;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import com.google.common.math.Stats;
 
@@ -60,8 +60,8 @@ import static 
org.apache.streampipes.processors.transformation.jvm.processor.sta
 import static 
org.apache.streampipes.processors.transformation.jvm.processor.state.StateUtils.getLabelStrings;
 import static 
org.apache.streampipes.processors.transformation.jvm.processor.state.StateUtils.getNumberValues;
 
-public class StateBufferLabelerProcessor extends StreamPipesDataProcessor
-    implements 
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, 
ProcessingElementParameterExtractor> {
+public class StateBufferLabelerProcessor implements IStreamPipesDataProcessor,
+    ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, 
ProcessingElementParameterExtractor> {
 
   public static final String STATE_FILTER_ID = "stateFilterId";
   public static final String STATE_FIELD_ID = "stateFieldId";
@@ -82,69 +82,65 @@ public class StateBufferLabelerProcessor extends 
StreamPipesDataProcessor
   private List<Statement> statements;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer",
 0)
-        .category(DataProcessorType.STRING_OPERATOR)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.listRequirement(),
-                Labels.withId(SENSOR_VALUE_ID),
-                PropertyScope.NONE)
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.semanticTypeReqList(SPSensor.STATE),
-                Labels.withId(STATE_FIELD_ID),
-                PropertyScope.NONE)
-            .build())
-        .requiredTextParameter(Labels.withId(LABEL_NAME))
-        .requiredTextParameter(Labels.withId(STATE_FILTER_ID))
-        .requiredSingleValueSelection(Labels.withId(OPERATIONS_ID),
-            Options.from(MINIMUM, MAXIMUM, AVERAGE))
-        .requiredCollection(
-            Labels.withId(LABEL_COLLECTION_ID),
-            StaticProperties.group(Labels.from("group", "Group", ""), false,
-                
StaticProperties.singleValueSelection(Labels.withId(COMPARATOR_ID),
-                    Options.from("<", "<=", ">", ">=", "==", "*")),
-                
StaticProperties.doubleFreeTextProperty(Labels.withId(NUMBER_VALUE_ID)),
-                
StaticProperties.stringFreeTextProperty(Labels.withId(LABEL_STRING_ID))
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        StateBufferLabelerProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer",
 0)
+            .category(DataProcessorType.STRING_OPERATOR)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.listRequirement(),
+                    Labels.withId(SENSOR_VALUE_ID),
+                    PropertyScope.NONE)
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.semanticTypeReqList(SPSensor.STATE),
+                    Labels.withId(STATE_FIELD_ID),
+                    PropertyScope.NONE)
+                .build())
+            .requiredTextParameter(Labels.withId(LABEL_NAME))
+            .requiredTextParameter(Labels.withId(STATE_FILTER_ID))
+            .requiredSingleValueSelection(Labels.withId(OPERATIONS_ID),
+                Options.from(MINIMUM, MAXIMUM, AVERAGE))
+            .requiredCollection(
+                Labels.withId(LABEL_COLLECTION_ID),
+                StaticProperties.group(Labels.from("group", "Group", ""), 
false,
+                    
StaticProperties.singleValueSelection(Labels.withId(COMPARATOR_ID),
+                        Options.from("<", "<=", ">", ">=", "==", "*")),
+                    
StaticProperties.doubleFreeTextProperty(Labels.withId(NUMBER_VALUE_ID)),
+                    
StaticProperties.stringFreeTextProperty(Labels.withId(LABEL_STRING_ID))
+                )
             )
-        )
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.stringEp(Labels.withId(LABEL), LABEL, SPSensor.STATE, 
PropertyScope.DIMENSION_PROPERTY)
-        ))
-        .build();
+            .outputStrategy(OutputStrategies.append(
+                EpProperties.stringEp(Labels.withId(LABEL), LABEL, 
SPSensor.STATE, PropertyScope.DIMENSION_PROPERTY)
+            ))
+            .build()
+    );
   }
 
   @Override
   public EventSchema resolveOutputStrategy(DataProcessorInvocation 
processingElement,
-                                           ProcessingElementParameterExtractor 
parameterExtractor)
-      throws SpRuntimeException {
-
+                                           ProcessingElementParameterExtractor 
parameterExtractor) {
     String labelName = getLabelName(parameterExtractor);
-
     List<String> labelStrings = getLabelStrings(parameterExtractor);
-
     return LabelerUtils.resolveOutputStrategy(processingElement, labelName, 
labelStrings);
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                                SpOutputCollector spOutputCollector,
+                                EventProcessorRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     sensorListValueProperty = extractor.mappingPropertyValue(SENSOR_VALUE_ID);
     stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
     stateFilter = extractor.singleValueParameter(STATE_FILTER_ID, 
String.class);
     selectedOperation = extractor.selectedSingleValue(OPERATIONS_ID, 
String.class);
-
     labelName = getLabelName(extractor);
-
     List<Double> numberValues = getNumberValues(extractor);
     List<String> labelStrings = getLabelStrings(extractor);
     List<String> comparators = getComparators(extractor);
-
     statements = StatementUtils.getStatements(
         numberValues,
         labelStrings,
@@ -152,15 +148,13 @@ public class StateBufferLabelerProcessor extends 
StreamPipesDataProcessor
   }
 
   @Override
-  public void onEvent(Event inputEvent,
-                      SpOutputCollector collector) throws SpRuntimeException {
+  public void onEvent(Event inputEvent, SpOutputCollector collector) {
     List<Double> values =
         
inputEvent.getFieldBySelector(this.sensorListValueProperty).getAsList().parseAsSimpleType(Double.class);
     List<String> states = 
inputEvent.getFieldBySelector(this.stateProperty).getAsList().parseAsSimpleType(String.class);
 
     if (states.contains(this.stateFilter) || this.stateFilter.equals("*")) {
       double calculatedValue;
-
       if (StateBufferLabelerProcessor.MAXIMUM.equals(this.selectedOperation)) {
         calculatedValue = Stats.of(values).max();
       } else if 
(StateBufferLabelerProcessor.MINIMUM.equals(this.selectedOperation)) {
@@ -168,14 +162,12 @@ public class StateBufferLabelerProcessor extends 
StreamPipesDataProcessor
       } else {
         calculatedValue = Stats.of(values).mean();
       }
-
       Event resultEvent = StatementUtils.addLabel(inputEvent, labelName, 
calculatedValue, this.statements);
       collector.collect(resultEvent);
     }
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/task/TaskDurationProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/task/TaskDurationProcessor.java
index 44fa065b28..552f0a450c 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/task/TaskDurationProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/task/TaskDurationProcessor.java
@@ -17,26 +17,26 @@
  */
 package org.apache.streampipes.processors.transformation.jvm.processor.task;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class TaskDurationProcessor extends StreamPipesDataProcessor {
+public class TaskDurationProcessor implements IStreamPipesDataProcessor {
 
   private static final String TASK_FIELD_KEY = "task-field";
   private static final String TIMESTAMP_FIELD_KEY = "timestamp-field";
@@ -56,34 +56,36 @@ public class TaskDurationProcessor extends 
StreamPipesDataProcessor {
   private Long lastTimestamp;
   private Double outputDivisor;
 
-
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.transformation.jvm.taskduration", 0)
-        .category(DataProcessorType.TIME)
-        .withLocales(Locales.EN)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.anyProperty(),
-                Labels.withId(TASK_FIELD_KEY),
-                PropertyScope.NONE)
-            .requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(),
-                Labels.withId(TIMESTAMP_FIELD_KEY), PropertyScope.NONE)
-            .build())
-        .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID), 
Options.from(MILLISECONDS, SECONDS, MINUTES))
-        
.outputStrategy(OutputStrategies.fixed(EpProperties.stringEp(Labels.withId(TASK_ID),
-                "processId", "http://schema.org/taskId";),
-            EpProperties.integerEp(Labels.withId(DURATION_ID), "duration",
-                "http://schema.org/duration";)))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        TaskDurationProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.transformation.jvm.taskduration", 0)
+            .category(DataProcessorType.TIME)
+            .withLocales(Locales.EN)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.anyProperty(),
+                    Labels.withId(TASK_FIELD_KEY),
+                    PropertyScope.NONE)
+                
.requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(),
+                    Labels.withId(TIMESTAMP_FIELD_KEY), PropertyScope.NONE)
+                .build())
+            .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID), 
Options.from(MILLISECONDS, SECONDS, MINUTES))
+            
.outputStrategy(OutputStrategies.fixed(EpProperties.stringEp(Labels.withId(TASK_ID),
+                    "processId", "http://schema.org/taskId";),
+                EpProperties.integerEp(Labels.withId(DURATION_ID), "duration",
+                    "http://schema.org/duration";)))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters parameters,
+                               SpOutputCollector spOutputCollector,
+                               EventProcessorRuntimeContext runtimeContext) {
     var extractor = parameters.extractor();
     taskFieldSelector = extractor.mappingPropertyValue(TASK_FIELD_KEY);
     timestampFieldSelector = 
extractor.mappingPropertyValue(TIMESTAMP_FIELD_KEY);
@@ -99,7 +101,7 @@ public class TaskDurationProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event,
-                      SpOutputCollector collector) throws SpRuntimeException {
+                      SpOutputCollector collector) {
     String taskValue = 
event.getFieldBySelector(taskFieldSelector).getAsPrimitive().getAsString();
     Long timestampValue =
         
event.getFieldBySelector(timestampFieldSelector).getAsPrimitive().getAsLong();
@@ -111,7 +113,6 @@ public class TaskDurationProcessor extends 
StreamPipesDataProcessor {
       if (!this.lastValue.equals(taskValue)) {
         Long duration = timestampValue - this.lastTimestamp;
 
-
         double result = duration / this.outputDivisor;
 
         Event outEvent = new Event();
@@ -131,7 +132,7 @@ public class TaskDurationProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
     this.lastValue = null;
   }
 }

Reply via email to