This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new d0294a79ae refactor(#3652): Update processors to implement 
IStreamPipesDataProcessor interface (#3664)
d0294a79ae is described below

commit d0294a79aeacd15319e863efcf30c31e241dc46a
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Jul 11 09:02:28 2025 +0200

    refactor(#3652): Update processors to implement IStreamPipesDataProcessor 
interface (#3664)
---
 .../buffergeometry/BufferGeomProcessor.java        | 195 ++++++++++++---------
 .../bufferpoint/BufferPointProcessor.java          | 145 ++++++++-------
 .../geo/jvm/jts/processor/epsg/EpsgProcessor.java  |  56 +++---
 .../LatLngToJtsPointProcessor.java                 | 101 ++++++-----
 .../reprojection/ReprojectionProcessor.java        |  90 ++++++----
 .../trajectory/TrajectoryFromPointsProcessor.java  | 148 +++++++++-------
 .../complex/TopologyValidationProcessor.java       | 116 +++++++-----
 .../simple/GeometryValidationProcessor.java        | 115 ++++++------
 .../HaversineDistanceCalculatorProcessor.java      | 114 +++++++-----
 ...HaversineStaticDistanceCalculatorProcessor.java | 103 ++++++-----
 .../googlemaps/GoogleMapsGeocoderProcessor.java    |  77 ++++----
 .../GoogleMapsStaticGeocoderProcessor.java         |  77 ++++----
 .../geocityname/GeoCityNameRevdecodeProcessor.java |  86 +++++----
 .../speedcalculator/SpeedCalculatorProcessor.java  | 102 ++++++-----
 14 files changed, 890 insertions(+), 635 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/buffergeometry/BufferGeomProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/buffergeometry/BufferGeomProcessor.java
index 99c9dea163..bf01036df5 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/buffergeometry/BufferGeomProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/buffergeometry/BufferGeomProcessor.java
@@ -18,11 +18,13 @@
 package org.apache.streampipes.processors.geo.jvm.jts.processor.buffergeometry;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
@@ -33,21 +35,20 @@ import 
org.apache.streampipes.processors.geo.jvm.jts.helper.buffer.JoinStyle;
 import 
org.apache.streampipes.processors.geo.jvm.jts.helper.buffer.SpBufferBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.locationtech.jts.geom.Geometry;
 import org.opengis.util.FactoryException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BufferGeomProcessor extends StreamPipesDataProcessor {
+public class BufferGeomProcessor implements IStreamPipesDataProcessor {
   public static final String GEOM_KEY = "geometry-key";
   public static final String EPSG_KEY = "epsg-key";
   public static final String CAP_KEY = "cap-style-key";
@@ -72,80 +73,92 @@ public class BufferGeomProcessor extends 
StreamPipesDataProcessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(BufferGeomProcessor.class);
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.buffergeometry",
 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry";),
-                Labels.withId(GEOM_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .requiredPropertyWithUnaryMapping(
-                
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
-                Labels.withId(EPSG_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .build())
-        .outputStrategy(OutputStrategies.append(
-                EpProperties.stringEp(
-                    Labels.withId(GEOM_KEY),
-                    GEOM_RUNTIME,
-                    "http://www.opengis.net/ont/geosparql#Geometry";
-                ),
-                EpProperties.numberEp(
-                    Labels.withId(EPSG_KEY),
-                    EPSG_RUNTIME,
-                    "http://data.ign.fr/def/ignf#CartesianCS";
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        BufferGeomProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.buffergeometry",
 0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry";),
+                                    Labels.withId(GEOM_KEY),
+                                    PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
+                                    Labels.withId(EPSG_KEY),
+                                    PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.append(
+                                EpProperties.stringEp(
+                                    Labels.withId(GEOM_KEY),
+                                    GEOM_RUNTIME,
+                                    
"http://www.opengis.net/ont/geosparql#Geometry";
+                                ),
+                                EpProperties.numberEp(
+                                    Labels.withId(EPSG_KEY),
+                                    EPSG_RUNTIME,
+                                    "http://data.ign.fr/def/ignf#CartesianCS";
+                                )
+                            )
+            )
+            .requiredSingleValueSelection(
+                Labels.withId(CAP_KEY),
+                Options.from(
+                    CapStyle.Square.name(),
+                    CapStyle.Flat.name(),
+                    CapStyle.Round.name()
+                )
+            )
+            .requiredSingleValueSelection(
+                Labels.withId(JOIN_KEY),
+                Options.from(
+                    JoinStyle.Bevel.name(),
+                    JoinStyle.Mitre.name(),
+                    JoinStyle.Round.name()
+                )
+            )
+            .requiredSingleValueSelection(
+                Labels.withId(SIDE_KEY),
+                Options.from(
+                    BufferSide.Both.name(),
+                    BufferSide.Left.name(),
+                    BufferSide.Right.name()
                 )
             )
-        )
-        .requiredSingleValueSelection(
-            Labels.withId(CAP_KEY),
-            Options.from(
-                CapStyle.Square.name(),
-                CapStyle.Flat.name(),
-                CapStyle.Round.name())
-        )
-        .requiredSingleValueSelection(
-            Labels.withId(JOIN_KEY),
-            Options.from(
-                JoinStyle.Bevel.name(),
-                JoinStyle.Mitre.name(),
-                JoinStyle.Round.name())
-        )
-        .requiredSingleValueSelection(
-            Labels.withId(SIDE_KEY),
-            Options.from(
-                BufferSide.Both.name(),
-                BufferSide.Left.name(),
-                BufferSide.Right.name())
-        )
-        .requiredIntegerParameter(
-            Labels.withId(MITRE_LIMIT_KEY),
-            5)
-        .requiredIntegerParameter(
-            Labels.withId(SEGMENTS_KEY),
-            8
-        )
-        .requiredFloatParameter(
-            Labels.withId(SIMPLIFY_FACTOR_KEY),
-            0.01f
-        )
-        .requiredFloatParameter(
-            Labels.withId(DISTANCE_KEY)
-        )
-        .build();
+            .requiredIntegerParameter(
+                Labels.withId(MITRE_LIMIT_KEY),
+                5
+            )
+            .requiredIntegerParameter(
+                Labels.withId(SEGMENTS_KEY),
+                8
+            )
+            .requiredFloatParameter(
+                Labels.withId(SIMPLIFY_FACTOR_KEY),
+                0.01f
+            )
+            .requiredFloatParameter(
+                Labels.withId(DISTANCE_KEY)
+            )
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
 
     try {
-      if (SpReprojectionBuilder.isSisConfigurationValid()){
+      if (SpReprojectionBuilder.isSisConfigurationValid()) {
         LOG.info("SIS DB Settings successful checked ");
       } else {
         LOG.warn("The required EPSG database is not imported");
@@ -155,15 +168,24 @@ public class BufferGeomProcessor extends 
StreamPipesDataProcessor {
       throw new SpRuntimeException("Something unexpected happened " + e);
     }
 
-    this.geometryMapper = 
parameters.extractor().mappingPropertyValue(GEOM_KEY);
-    this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
-    String readCapStyle = parameters.extractor().selectedSingleValue(CAP_KEY, 
String.class);
-    String readJoinStyle = 
parameters.extractor().selectedSingleValue(JOIN_KEY, String.class);
-    String readSide = parameters.extractor().selectedSingleValue(SIDE_KEY, 
String.class);
-    this.mitreLimit = 
parameters.extractor().singleValueParameter(MITRE_LIMIT_KEY, Double.class);
-    this.segments = parameters.extractor().singleValueParameter(SEGMENTS_KEY, 
Integer.class);
-    this.simplifyFactor = 
parameters.extractor().singleValueParameter(SIMPLIFY_FACTOR_KEY, Double.class);
-    this.distance = parameters.extractor().singleValueParameter(DISTANCE_KEY, 
Double.class);
+    this.geometryMapper = params.extractor()
+                                .mappingPropertyValue(GEOM_KEY);
+    this.epsgMapper = params.extractor()
+                            .mappingPropertyValue(EPSG_KEY);
+    String readCapStyle = params.extractor()
+                                .selectedSingleValue(CAP_KEY, String.class);
+    String readJoinStyle = params.extractor()
+                                 .selectedSingleValue(JOIN_KEY, String.class);
+    String readSide = params.extractor()
+                            .selectedSingleValue(SIDE_KEY, String.class);
+    this.mitreLimit = params.extractor()
+                            .singleValueParameter(MITRE_LIMIT_KEY, 
Double.class);
+    this.segments = params.extractor()
+                          .singleValueParameter(SEGMENTS_KEY, Integer.class);
+    this.simplifyFactor = params.extractor()
+                                .singleValueParameter(SIMPLIFY_FACTOR_KEY, 
Double.class);
+    this.distance = params.extractor()
+                          .singleValueParameter(DISTANCE_KEY, Double.class);
     // transform names to numbers
     this.capStyle = 1;
     if (readCapStyle.equals(CapStyle.Square.name())) {
@@ -192,8 +214,12 @@ public class BufferGeomProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    String geom = 
event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
-    Integer epsg = 
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+    String geom = event.getFieldBySelector(geometryMapper)
+                       .getAsPrimitive()
+                       .getAsString();
+    Integer epsg = event.getFieldBySelector(epsgMapper)
+                        .getAsPrimitive()
+                        .getAsInt();
     Geometry geometry = SpGeometryBuilder.createSPGeom(geom, epsg);
     Geometry bufferGeom =
         SpBufferBuilder.createSpBuffer(
@@ -205,7 +231,8 @@ public class BufferGeomProcessor extends 
StreamPipesDataProcessor {
             segments,
             simplifyFactor,
             singleSided,
-            side);
+            side
+        );
 
     if (!bufferGeom.isEmpty()) {
       event.addField(GEOM_RUNTIME, bufferGeom.toText());
@@ -220,6 +247,6 @@ public class BufferGeomProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/bufferpoint/BufferPointProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/bufferpoint/BufferPointProcessor.java
index 25a4aff818..c4ca305dd0 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/bufferpoint/BufferPointProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/bufferpoint/BufferPointProcessor.java
@@ -18,11 +18,13 @@
 package org.apache.streampipes.processors.geo.jvm.jts.processor.bufferpoint;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
@@ -31,14 +33,13 @@ import 
org.apache.streampipes.processors.geo.jvm.jts.helper.buffer.CapStyle;
 import 
org.apache.streampipes.processors.geo.jvm.jts.helper.buffer.SpBufferBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.geom.Point;
@@ -47,7 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class BufferPointProcessor extends StreamPipesDataProcessor {
+public class BufferPointProcessor implements IStreamPipesDataProcessor {
   public static final String GEOM_KEY = "geometry-key";
   public static final String EPSG_KEY = "epsg-key";
   public static final String CAP_KEY = "cap-style-key";
@@ -65,62 +66,71 @@ public class BufferPointProcessor extends 
StreamPipesDataProcessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(BufferPointProcessor.class);
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.bufferpoint", 
0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON, 
"output.png")
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry";),
-                Labels.withId(GEOM_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .requiredPropertyWithUnaryMapping(
-                
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
-                Labels.withId(EPSG_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .build())
-        .outputStrategy(OutputStrategies.append(
-                EpProperties.stringEp(
-                    Labels.withId(GEOM_KEY),
-                    GEOM_RUNTIME,
-                    "http://www.opengis.net/ont/geosparql#Geometry";
-                ),
-                EpProperties.numberEp(
-                    Labels.withId(EPSG_KEY),
-                    EPSG_RUNTIME,
-                    "http://data.ign.fr/def/ignf#CartesianCS";
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        BufferPointProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.bufferpoint", 
0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON, "output.png")
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry";),
+                                    Labels.withId(GEOM_KEY),
+                                    PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
+                                    Labels.withId(EPSG_KEY),
+                                    PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.append(
+                                EpProperties.stringEp(
+                                    Labels.withId(GEOM_KEY),
+                                    GEOM_RUNTIME,
+                                    
"http://www.opengis.net/ont/geosparql#Geometry";
+                                ),
+                                EpProperties.numberEp(
+                                    Labels.withId(EPSG_KEY),
+                                    EPSG_RUNTIME,
+                                    "http://data.ign.fr/def/ignf#CartesianCS";
+                                )
+                            )
+            )
+            .requiredSingleValueSelection(
+                Labels.withId(CAP_KEY),
+                Options.from(
+                    CapStyle.Square.name(),
+                    CapStyle.Round.name()
                 )
             )
-        )
-        .requiredSingleValueSelection(
-            Labels.withId(CAP_KEY),
-            Options.from(
-                CapStyle.Square.name(),
-                CapStyle.Round.name())
-        )
-        .requiredIntegerParameter(
-            Labels.withId(SEGMENTS_KEY),
-            8
-        )
-        .requiredFloatParameter(
-            Labels.withId(SIMPLIFY_FACTOR_KEY),
-            0.01f
-        )
-        .requiredFloatParameter(
-            Labels.withId(DISTANCE_KEY)
-        )
-        .build();
+            .requiredIntegerParameter(
+                Labels.withId(SEGMENTS_KEY),
+                8
+            )
+            .requiredFloatParameter(
+                Labels.withId(SIMPLIFY_FACTOR_KEY),
+                0.01f
+            )
+            .requiredFloatParameter(
+                Labels.withId(DISTANCE_KEY)
+            )
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
 
     try {
-      if (SpReprojectionBuilder.isSisConfigurationValid()){
+      if (SpReprojectionBuilder.isSisConfigurationValid()) {
         LOG.info("SIS DB Settings successful checked ");
       } else {
         LOG.warn("The required EPSG database is not imported");
@@ -130,12 +140,18 @@ public class BufferPointProcessor extends 
StreamPipesDataProcessor {
       throw new SpRuntimeException("Something unexpected happened " + e);
     }
 
-    this.geometryMapper = 
parameters.extractor().mappingPropertyValue(GEOM_KEY);
-    this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
-    String readCapStyle = parameters.extractor().selectedSingleValue(CAP_KEY, 
String.class);
-    this.segments = parameters.extractor().singleValueParameter(SEGMENTS_KEY, 
Integer.class);
-    this.simplifyFactor = 
parameters.extractor().singleValueParameter(SIMPLIFY_FACTOR_KEY, Double.class);
-    this.distance = parameters.extractor().singleValueParameter(DISTANCE_KEY, 
Double.class);
+    this.geometryMapper = params.extractor()
+                                .mappingPropertyValue(GEOM_KEY);
+    this.epsgMapper = params.extractor()
+                            .mappingPropertyValue(EPSG_KEY);
+    String readCapStyle = params.extractor()
+                                .selectedSingleValue(CAP_KEY, String.class);
+    this.segments = params.extractor()
+                          .singleValueParameter(SEGMENTS_KEY, Integer.class);
+    this.simplifyFactor = params.extractor()
+                                .singleValueParameter(SIMPLIFY_FACTOR_KEY, 
Double.class);
+    this.distance = params.extractor()
+                          .singleValueParameter(DISTANCE_KEY, Double.class);
 
     // transform names to numbers
     this.capStyle = 1;
@@ -148,8 +164,12 @@ public class BufferPointProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    String geom = 
event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
-    Integer epsg = 
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+    String geom = event.getFieldBySelector(geometryMapper)
+                       .getAsPrimitive()
+                       .getAsString();
+    Integer epsg = event.getFieldBySelector(epsgMapper)
+                        .getAsPrimitive()
+                        .getAsInt();
     Geometry geometry = SpGeometryBuilder.createSPGeom(geom, epsg);
 
 
@@ -167,7 +187,8 @@ public class BufferPointProcessor extends 
StreamPipesDataProcessor {
       LOG.warn("Only points are supported but input type is " + 
geometry.getGeometryType());
     }
   }
+
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
index 3126196d84..7a28f28db9 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
@@ -19,51 +19,59 @@
 package org.apache.streampipes.processors.geo.jvm.jts.processor.epsg;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class EpsgProcessor extends StreamPipesDataProcessor {
+public class EpsgProcessor implements IStreamPipesDataProcessor {
   private static final String EPSG_KEY = "epsg-key";
   private static final String EPSG_RUNTIME = "epsg";
 
   private int epsgCode;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.epsg", 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .build())
-        .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
-            .create(Datatypes.Integer, EPSG_RUNTIME)
-            .semanticType("http://data.ign.fr/def/ignf#CartesianCS";)
-            .build())
-        )
-        .requiredIntegerParameter(Labels.withId(EPSG_KEY), 4326)
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        EpsgProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.epsg", 0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .build())
+            .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
+                                                        
.create(Datatypes.Integer, EPSG_RUNTIME)
+                                                        
.semanticType("http://data.ign.fr/def/ignf#CartesianCS";)
+                                                        .build())
+            )
+            .requiredIntegerParameter(Labels.withId(EPSG_KEY), 4326)
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    this.epsgCode = parameters.extractor().singleValueParameter(EPSG_KEY, 
Integer.class);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.epsgCode = params.extractor()
+                          .singleValueParameter(EPSG_KEY, Integer.class);
   }
 
   @Override
@@ -73,7 +81,7 @@ public class EpsgProcessor extends StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
 
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
index 3117685f79..0a208459dc 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
@@ -19,31 +19,32 @@
 package 
org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Datatypes;
 import org.apache.streampipes.vocabulary.Geo;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.locationtech.jts.geom.Point;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
+public class LatLngToJtsPointProcessor implements IStreamPipesDataProcessor {
   private static final String LAT_KEY = "latitude-key";
   private static final String LNG_KEY = "longitude-key";
   private static final String EPSG_KEY = "epsg-key";
@@ -53,50 +54,69 @@ public class LatLngToJtsPointProcessor extends 
StreamPipesDataProcessor {
   private String epsgMapper;
   private static final Logger LOG = 
LoggerFactory.getLogger(LatLngToJtsPointProcessor.class);
   public static final String EPA_NAME = "Latitude Longitude To JTS Point";
+
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint",
 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(
-            StreamRequirementsBuilder
-                .create()
-                
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LAT),
-                    Labels.withId(LAT_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-                .requiredPropertyWithUnaryMapping(
-                    EpRequirements.semanticTypeReq(Geo.LNG),
-                    Labels.withId(LNG_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-                .requiredPropertyWithUnaryMapping(
-                    
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
-                    Labels.withId(EPSG_KEY), 
PropertyScope.MEASUREMENT_PROPERTY)
-                .build()
-        )
-        .outputStrategy(
-            OutputStrategies.append(
-                PrimitivePropertyBuilder
-                    .create(Datatypes.String, GEOMETRY_RUNTIME)
-                    
.semanticType("http://www.opengis.net/ont/geosparql#Geometry";)
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        LatLngToJtsPointProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint",
 0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(
+                StreamRequirementsBuilder
+                    .create()
+                    .requiredPropertyWithUnaryMapping(
+                        EpRequirements.semanticTypeReq(Geo.LAT),
+                        Labels.withId(LAT_KEY), 
PropertyScope.MEASUREMENT_PROPERTY
+                    )
+                    .requiredPropertyWithUnaryMapping(
+                        EpRequirements.semanticTypeReq(Geo.LNG),
+                        Labels.withId(LNG_KEY), 
PropertyScope.MEASUREMENT_PROPERTY
+                    )
+                    .requiredPropertyWithUnaryMapping(
+                        
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
+                        Labels.withId(EPSG_KEY), 
PropertyScope.MEASUREMENT_PROPERTY
+                    )
                     .build()
             )
-        )
-        .build();
+            .outputStrategy(
+                OutputStrategies.append(
+                    PrimitivePropertyBuilder
+                        .create(Datatypes.String, GEOMETRY_RUNTIME)
+                        
.semanticType("http://www.opengis.net/ont/geosparql#Geometry";)
+                        .build()
+                )
+            )
+            .build());
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    this.latitudeMapper = parameters.extractor().mappingPropertyValue(LAT_KEY);
-    this.longitudeMapper = 
parameters.extractor().mappingPropertyValue(LNG_KEY);
-    this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.latitudeMapper = params.extractor()
+                                .mappingPropertyValue(LAT_KEY);
+    this.longitudeMapper = params.extractor()
+                                 .mappingPropertyValue(LNG_KEY);
+    this.epsgMapper = params.extractor()
+                            .mappingPropertyValue(EPSG_KEY);
   }
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    Double lat = 
event.getFieldBySelector(latitudeMapper).getAsPrimitive().getAsDouble();
-    Double lng = 
event.getFieldBySelector(longitudeMapper).getAsPrimitive().getAsDouble();
-    Integer epsg = 
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+    Double lat = event.getFieldBySelector(latitudeMapper)
+                      .getAsPrimitive()
+                      .getAsDouble();
+    Double lng = event.getFieldBySelector(longitudeMapper)
+                      .getAsPrimitive()
+                      .getAsDouble();
+    Integer epsg = event.getFieldBySelector(epsgMapper)
+                        .getAsPrimitive()
+                        .getAsInt();
 
     Point geom = SpGeometryBuilder.createSPGeom(lng, lat, epsg);
 
@@ -107,13 +127,12 @@ public class LatLngToJtsPointProcessor extends 
StreamPipesDataProcessor {
       collector.collect(event);
     } else {
       LOG.warn("An empty point geometry in " + EPA_NAME + " is created due"
-          + "invalid input field. Latitude: " + lat + "Longitude: " + lng);
+                   + "invalid input field. Latitude: " + lat + "Longitude: " + 
lng);
       LOG.error("An event is filtered out because of invalid geometry");
     }
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
index d1f8696732..78cb7297ed 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
@@ -19,11 +19,13 @@
 package org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpNotSupportedGeometryException;
@@ -31,19 +33,18 @@ import 
org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
 import 
org.apache.streampipes.processors.geo.jvm.jts.helper.SpReprojectionBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.locationtech.jts.geom.Geometry;
 import org.opengis.util.FactoryException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ReprojectionProcessor extends StreamPipesDataProcessor {
+public class ReprojectionProcessor implements IStreamPipesDataProcessor {
   public static final String GEOM_KEY = "geom-key";
   public static final String SOURCE_EPSG_KEY = "source-epsg-key";
   public static final String TARGET_EPSG_KEY = "target-epsg-key";
@@ -55,39 +56,49 @@ public class ReprojectionProcessor extends 
StreamPipesDataProcessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReprojectionProcessor.class);
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection", 
0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry";),
-                Labels.withId(GEOM_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .requiredPropertyWithUnaryMapping(
-                
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
-                Labels.withId(SOURCE_EPSG_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .build())
-        .outputStrategy(OutputStrategies.keep())
-        .requiredIntegerParameter(Labels.withId(TARGET_EPSG_KEY), 32632)
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        ReprojectionProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection", 
0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry";),
+                                    Labels.withId(GEOM_KEY),
+                                    PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
+                                    Labels.withId(SOURCE_EPSG_KEY),
+                                    PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.keep())
+            .requiredIntegerParameter(Labels.withId(TARGET_EPSG_KEY), 32632)
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-
-    this.geometryMapper = 
parameters.extractor().mappingPropertyValue(GEOM_KEY);
-    this.sourceEpsgMapper = 
parameters.extractor().mappingPropertyValue(SOURCE_EPSG_KEY);
-    this.targetEpsg = 
parameters.extractor().singleValueParameter(TARGET_EPSG_KEY, Integer.class);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.geometryMapper = params.extractor()
+                                .mappingPropertyValue(GEOM_KEY);
+    this.sourceEpsgMapper = params.extractor()
+                                  .mappingPropertyValue(SOURCE_EPSG_KEY);
+    this.targetEpsg = params.extractor()
+                            .singleValueParameter(TARGET_EPSG_KEY, 
Integer.class);
 
     // check if SIS DB is set up with imported data or is null
     try {
-      if (SpReprojectionBuilder.isSisConfigurationValid()){
+      if (SpReprojectionBuilder.isSisConfigurationValid()) {
         LOG.info("SIS DB Settings successful checked ");
       } else {
         LOG.warn("The required EPSG database is not imported");
@@ -102,7 +113,7 @@ public class ReprojectionProcessor extends 
StreamPipesDataProcessor {
       if (!SpReprojectionBuilder.isSisDbCorrectVersion()) {
         LOG.warn("Not supported EPSG DB is used.");
         throw new SpRuntimeException("Your current EPSG DB version " + 
SpReprojectionBuilder.getSisDbVersion()
-            + " is not the supported 9.9.1 version. ");
+                                         + " is not the supported 9.9.1 
version. ");
       }
     } catch (FactoryException e) {
       throw new SpRuntimeException("Something unexpected happened " + e);
@@ -111,14 +122,18 @@ public class ReprojectionProcessor extends 
StreamPipesDataProcessor {
     // checks if Input EPSG in valid and exists in EPSG DB
     if (!SpReprojectionBuilder.isSisEpsgValid(this.targetEpsg)) {
       throw new SpRuntimeException("Your chosen EPSG Code " + this.targetEpsg 
+ " is not valid. "
-          + "Check EPSG on https://spatialreference.org";);
+                                       + "Check EPSG on 
https://spatialreference.org";);
     }
   }
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    String geom = 
event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
-    Integer sourceEpsg = 
event.getFieldBySelector(sourceEpsgMapper).getAsPrimitive().getAsInt();
+    String geom = event.getFieldBySelector(geometryMapper)
+                       .getAsPrimitive()
+                       .getAsString();
+    Integer sourceEpsg = event.getFieldBySelector(sourceEpsgMapper)
+                              .getAsPrimitive()
+                              .getAsInt();
 
     Geometry geometry = SpGeometryBuilder.createSPGeom(geom, sourceEpsg);
 
@@ -136,12 +151,11 @@ public class ReprojectionProcessor extends 
StreamPipesDataProcessor {
       collector.collect(event);
     } else {
       LOG.warn("An empty point geometry is created"
-          + " due invalid input values. Check used epsg Code:" + targetEpsg);
+                   + " due invalid input values. Check used epsg Code:" + 
targetEpsg);
     }
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
index cdc18fcdab..a4112c664f 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
@@ -19,32 +19,33 @@
 package org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
 import 
org.apache.streampipes.processors.geo.jvm.jts.helper.SpTrajectoryBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.locationtech.jts.geom.LineString;
 import org.locationtech.jts.geom.Point;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
+public class TrajectoryFromPointsProcessor implements 
IStreamPipesDataProcessor {
   private static final String POINT_KEY = "point-key";
   private static final String EPSG_KEY = "epsg-key";
   private static final String M_KEY = "m-key";
@@ -63,78 +64,102 @@ public class TrajectoryFromPointsProcessor extends 
StreamPipesDataProcessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(TrajectoryFromPointsProcessor.class);
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory", 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(
-            StreamRequirementsBuilder
-                .create()
-                
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq
-                        ("http://www.opengis.net/ont/geosparql#Geometry";),
-                    Labels.withId(POINT_KEY), 
PropertyScope.MEASUREMENT_PROPERTY)
-                .requiredPropertyWithUnaryMapping(
-                    EpRequirements.semanticTypeReq
-                        ("http://data.ign.fr/def/ignf#CartesianCS";),
-                    Labels.withId(EPSG_KEY), 
PropertyScope.MEASUREMENT_PROPERTY)
-                .requiredPropertyWithUnaryMapping(
-                    EpRequirements.numberReq(),
-                    Labels.withId(M_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-                .build()
-        )
-        .requiredTextParameter(
-            Labels.withId(DESCRIPTION_KEY))
-        .requiredIntegerParameter(
-            Labels.withId(SUBPOINTS_KEY),
-            2, 30, 1)
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        TrajectoryFromPointsProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory", 0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(
+                StreamRequirementsBuilder
+                    .create()
+                    .requiredPropertyWithUnaryMapping(
+                        EpRequirements.semanticTypeReq
+                                          
("http://www.opengis.net/ont/geosparql#Geometry";),
+                        Labels.withId(POINT_KEY), 
PropertyScope.MEASUREMENT_PROPERTY
+                    )
+                    .requiredPropertyWithUnaryMapping(
+                        EpRequirements.semanticTypeReq
+                                          
("http://data.ign.fr/def/ignf#CartesianCS";),
+                        Labels.withId(EPSG_KEY), 
PropertyScope.MEASUREMENT_PROPERTY
+                    )
+                    .requiredPropertyWithUnaryMapping(
+                        EpRequirements.numberReq(),
+                        Labels.withId(M_KEY), 
PropertyScope.MEASUREMENT_PROPERTY
+                    )
+                    .build()
+            )
+            .requiredTextParameter(
+                Labels.withId(DESCRIPTION_KEY))
+            .requiredIntegerParameter(
+                Labels.withId(SUBPOINTS_KEY),
+                2, 30, 1
+            )
 
-        .outputStrategy(OutputStrategies.append(
-                EpProperties.stringEp(
-                    Labels.withId(DESCRIPTION_KEY),
-                    DESCRIPTION_RUNTIME,
-                    SO.TEXT),
-                EpProperties.stringEp(
-                    Labels.withId(TRAJECTORY_KEY),
-                    TRAJECTORY_GEOMETRY_RUNTIME,
-                    "http://www.opengis.net/ont/geosparql#Geometry";),
-                EpProperties.integerEp(
-                    Labels.withId(EPSG_KEY),
-                    TRAJECTORY_EPSG_RUNTIME,
-                "http://data.ign.fr/def/ignf#CartesianCS";)
+            .outputStrategy(OutputStrategies.append(
+                                EpProperties.stringEp(
+                                    Labels.withId(DESCRIPTION_KEY),
+                                    DESCRIPTION_RUNTIME,
+                                    SO.TEXT
+                                ),
+                                EpProperties.stringEp(
+                                    Labels.withId(TRAJECTORY_KEY),
+                                    TRAJECTORY_GEOMETRY_RUNTIME,
+                                    
"http://www.opengis.net/ont/geosparql#Geometry";
+                                ),
+                                EpProperties.integerEp(
+                                    Labels.withId(EPSG_KEY),
+                                    TRAJECTORY_EPSG_RUNTIME,
+                                    "http://data.ign.fr/def/ignf#CartesianCS";
+                                )
+                            )
             )
-        )
-        .build();
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    this.pointMapper = parameters.extractor().mappingPropertyValue(POINT_KEY);
-    this.mValueMapper = parameters.extractor().mappingPropertyValue(M_KEY);
-    this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
-    this.description = 
parameters.extractor().singleValueParameter(DESCRIPTION_KEY, String.class);
-    this.subpoints = 
parameters.extractor().singleValueParameter(SUBPOINTS_KEY, Integer.class);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.pointMapper = params.extractor()
+                             .mappingPropertyValue(POINT_KEY);
+    this.mValueMapper = params.extractor()
+                              .mappingPropertyValue(M_KEY);
+    this.epsgMapper = params.extractor()
+                            .mappingPropertyValue(EPSG_KEY);
+    this.description = params.extractor()
+                             .singleValueParameter(DESCRIPTION_KEY, 
String.class);
+    this.subpoints = params.extractor()
+                           .singleValueParameter(SUBPOINTS_KEY, Integer.class);
 
     trajectory = new SpTrajectoryBuilder(subpoints, description);
   }
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    // extract values
-    String point = 
event.getFieldBySelector(pointMapper).getAsPrimitive().getAsString();
-    Integer epsg = 
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
-    Double m = 
event.getFieldBySelector(mValueMapper).getAsPrimitive().getAsDouble();
-    //create JTS geometry
+    String point = event.getFieldBySelector(pointMapper)
+                        .getAsPrimitive()
+                        .getAsString();
+    Integer epsg = event.getFieldBySelector(epsgMapper)
+                        .getAsPrimitive()
+                        .getAsInt();
+    Double m = event.getFieldBySelector(mValueMapper)
+                    .getAsPrimitive()
+                    .getAsDouble();
+
     Point eventGeom = (Point) SpGeometryBuilder.createSPGeom(point, epsg);
     LOG.debug("Geometry Point created");
-    //adds point and m value to trajectory object
+
     trajectory.addPointToTrajectory(eventGeom, m);
     LOG.debug("Point added to trajectory");
-    // returns JTS LineString
+
     LineString geom = trajectory.returnAsLineString(eventGeom.getFactory());
-    // adds to stream
+
     event.addField(DESCRIPTION_RUNTIME, trajectory.getDescription());
     event.addField(TRAJECTORY_GEOMETRY_RUNTIME, geom.toString());
     event.addField(TRAJECTORY_EPSG_RUNTIME, epsg);
@@ -142,7 +167,6 @@ public class TrajectoryFromPointsProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java
index 74544e68df..dbd1992e05 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java
@@ -20,11 +20,13 @@ package 
org.apache.streampipes.processors.geo.jvm.jts.processor.validation.compl
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.monitoring.SpLogEntry;
 import org.apache.streampipes.model.monitoring.SpLogMessage;
 import org.apache.streampipes.model.runtime.Event;
@@ -34,20 +36,20 @@ import 
org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
 import 
org.apache.streampipes.processors.geo.jvm.jts.processor.validation.ValidationOutput;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.operation.valid.IsValidOp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TopologyValidationProcessor extends StreamPipesDataProcessor {
+public class TopologyValidationProcessor implements IStreamPipesDataProcessor {
   public static final String GEOM_KEY = "geom-key";
   public static final String EPSG_KEY = "epsg-key";
   public static final String VALIDATION_OUTPUT_KEY = "validation-output-key";
@@ -60,46 +62,57 @@ public class TopologyValidationProcessor extends 
StreamPipesDataProcessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(TopologyValidationProcessor.class);
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.validation.complex",
 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry";),
-                Labels.withId(GEOM_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .requiredPropertyWithUnaryMapping(
-                
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
-                Labels.withId(EPSG_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .build())
-        .outputStrategy(OutputStrategies.keep())
-        .requiredSingleValueSelection(
-            Labels.withId(VALIDATION_OUTPUT_KEY),
-            Options.from(
-                ValidationOutput.VALID.name(),
-                ValidationOutput.INVALID.name()
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        TopologyValidationProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.validation.complex",
 0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry";),
+                                    Labels.withId(GEOM_KEY),
+                                    PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
+                                    Labels.withId(EPSG_KEY),
+                                    PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.keep())
+            .requiredSingleValueSelection(
+                Labels.withId(VALIDATION_OUTPUT_KEY),
+                Options.from(
+                    ValidationOutput.VALID.name(),
+                    ValidationOutput.INVALID.name()
+                )
             )
-        )
-        .requiredSlideToggle(
-            Labels.withId(LOG_OUTPUT_KEY),
-            false)
-        .build();
+            .requiredSlideToggle(
+                Labels.withId(LOG_OUTPUT_KEY),
+                false
+            )
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-
-    this.params = parameters;
-    this.geometryMapper = 
parameters.extractor().mappingPropertyValue(GEOM_KEY);
-    this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
-    this.isLogOutput = parameters.extractor().slideToggleValue(LOG_OUTPUT_KEY);
-    String readValidationOutput = 
parameters.extractor().selectedSingleValue(VALIDATION_OUTPUT_KEY, String.class);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.geometryMapper = params.extractor()
+                                .mappingPropertyValue(GEOM_KEY);
+    this.epsgMapper = params.extractor()
+                            .mappingPropertyValue(EPSG_KEY);
+    this.isLogOutput = params.extractor()
+                             .slideToggleValue(LOG_OUTPUT_KEY);
+    String readValidationOutput = params.extractor()
+                                        
.selectedSingleValue(VALIDATION_OUTPUT_KEY, String.class);
 
     if (readValidationOutput.equals(ValidationOutput.VALID.name())) {
       this.outputChoice = ValidationOutput.VALID.name();
@@ -110,19 +123,29 @@ public class TopologyValidationProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    String geom = 
event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
-    Integer epsg = 
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+    String geom = event.getFieldBySelector(geometryMapper)
+                       .getAsPrimitive()
+                       .getAsString();
+    Integer epsg = event.getFieldBySelector(epsgMapper)
+                        .getAsPrimitive()
+                        .getAsInt();
 
     Geometry geometry = SpGeometryBuilder.createSPGeom(geom, epsg);
     IsValidOp validator = new IsValidOp(geometry);
     validator.setSelfTouchingRingFormingHoleValid(true);
     boolean itIsValid = validator.isValid();
-    if (!itIsValid){
+    if (!itIsValid) {
       if (isLogOutput) {
-        
SpMonitoringManager.INSTANCE.addErrorMessage(params.getGraph().getElementId(),
-            SpLogEntry.from(System.currentTimeMillis(),
+        SpMonitoringManager.INSTANCE.addErrorMessage(
+            params.getGraph()
+                  .getElementId(),
+            SpLogEntry.from(
+                System.currentTimeMillis(),
                 SpLogMessage.from(new SpJtsGeoemtryException(
-                    validator.getValidationError().toString()))));
+                    validator.getValidationError()
+                             .toString()))
+            )
+        );
       }
     }
 
@@ -138,7 +161,6 @@ public class TopologyValidationProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/simple/GeometryValidationProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/simple/GeometryValidationProcessor.java
index 48362ef36f..36526f6c72 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/simple/GeometryValidationProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/simple/GeometryValidationProcessor.java
@@ -19,11 +19,13 @@
 package 
org.apache.streampipes.processors.geo.jvm.jts.processor.validation.simple;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
@@ -31,13 +33,12 @@ import 
org.apache.streampipes.processors.geo.jvm.jts.processor.validation.Valida
 import 
org.apache.streampipes.processors.geo.jvm.jts.processor.validation.ValidationType;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.locationtech.jts.geom.Geometry;
 import org.slf4j.Logger;
@@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-public class GeometryValidationProcessor extends StreamPipesDataProcessor {
+public class GeometryValidationProcessor implements IStreamPipesDataProcessor {
   public static final String GEOM_KEY = "geom-key";
   public static final String EPSG_KEY = "epsg-key";
   public static final String VALIDATION_OUTPUT_KEY = "validation-output-key";
@@ -60,50 +61,61 @@ public class GeometryValidationProcessor extends 
StreamPipesDataProcessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(GeometryValidationProcessor.class);
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.validation.simple",
 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry";),
-                Labels.withId(GEOM_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .requiredPropertyWithUnaryMapping(
-                
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
-                Labels.withId(EPSG_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .build())
-        .outputStrategy(OutputStrategies.keep())
-        .requiredSingleValueSelection(
-            Labels.withId(VALIDATION_OUTPUT_KEY),
-            Options.from(
-                ValidationOutput.VALID.name(),
-                ValidationOutput.INVALID.name()
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        GeometryValidationProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.validation.simple",
 0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry";),
+                                    Labels.withId(GEOM_KEY),
+                                    PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS";),
+                                    Labels.withId(EPSG_KEY),
+                                    PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.keep())
+            .requiredSingleValueSelection(
+                Labels.withId(VALIDATION_OUTPUT_KEY),
+                Options.from(
+                    ValidationOutput.VALID.name(),
+                    ValidationOutput.INVALID.name()
+                )
             )
-        )
-        .requiredMultiValueSelection(
-            Labels.withId(VALIDATION_TYPE_KEY),
-            Options.from(
-                ValidationType.IsEmpty.name(),
-                ValidationType.IsSimple.name()
+            .requiredMultiValueSelection(
+                Labels.withId(VALIDATION_TYPE_KEY),
+                Options.from(
+                    ValidationType.IsEmpty.name(),
+                    ValidationType.IsSimple.name()
+                )
             )
-        )
-        .build();
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-
-    this.geometryMapper = 
parameters.extractor().mappingPropertyValue(GEOM_KEY);
-    this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
-
-    String readValidationOutput = 
parameters.extractor().selectedSingleValue(VALIDATION_OUTPUT_KEY, String.class);
-    List<String> readValidationType = 
parameters.extractor().selectedMultiValues(VALIDATION_TYPE_KEY, String.class);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.geometryMapper = params.extractor()
+                                .mappingPropertyValue(GEOM_KEY);
+    this.epsgMapper = params.extractor()
+                            .mappingPropertyValue(EPSG_KEY);
+
+    String readValidationOutput = params.extractor()
+                                        
.selectedSingleValue(VALIDATION_OUTPUT_KEY, String.class);
+    List<String> readValidationType = params.extractor()
+                                            
.selectedMultiValues(VALIDATION_TYPE_KEY, String.class);
 
     if (readValidationOutput.equals(ValidationOutput.VALID.name())) {
       this.outputChoice = ValidationOutput.VALID.name();
@@ -116,10 +128,12 @@ public class GeometryValidationProcessor extends 
StreamPipesDataProcessor {
       this.isEmptySelected = true;
       this.isSimpleSelected = true;
       this.isMultiSelected = true;
-    } else if 
(readValidationType.get(0).equals(ValidationType.IsEmpty.name())) {
+    } else if (readValidationType.get(0)
+                                 .equals(ValidationType.IsEmpty.name())) {
       this.isEmptySelected = true;
       this.isSimpleSelected = false;
-    } else if 
(readValidationType.get(0).equals(ValidationType.IsSimple.name())) {
+    } else if (readValidationType.get(0)
+                                 .equals(ValidationType.IsSimple.name())) {
       this.isEmptySelected = false;
       this.isSimpleSelected = true;
     }
@@ -127,8 +141,12 @@ public class GeometryValidationProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    String geom = 
event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
-    Integer sourceEpsg = 
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+    String geom = event.getFieldBySelector(geometryMapper)
+                       .getAsPrimitive()
+                       .getAsString();
+    Integer sourceEpsg = event.getFieldBySelector(epsgMapper)
+                              .getAsPrimitive()
+                              .getAsInt();
 
     Geometry geometry = SpGeometryBuilder.createSPGeom(geom, sourceEpsg);
 
@@ -154,7 +172,6 @@ public class GeometryValidationProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
index a80a143be0..b1aa67add9 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
@@ -19,17 +19,20 @@
 package 
org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversine;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
@@ -37,12 +40,10 @@ import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Datatypes;
 import org.apache.streampipes.vocabulary.Geo;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.net.URI;
 
-public class HaversineDistanceCalculatorProcessor extends 
StreamPipesDataProcessor {
+public class HaversineDistanceCalculatorProcessor implements 
IStreamPipesDataProcessor {
   private static final String LAT_1_KEY = "lat1";
   private static final String LONG_1_KEY = "long1";
   private static final String LAT_2_KEY = "lat2";
@@ -54,51 +55,79 @@ public class HaversineDistanceCalculatorProcessor extends 
StreamPipesDataProcess
   String long2FieldMapper;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create(
-            
"org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversine",
 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LAT),
-                Labels.withId(LAT_1_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-            
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LNG),
-                Labels.withId(LONG_1_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-            
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LAT),
-                Labels.withId(LAT_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-            
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LNG),
-                Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-            .build()
-        )
-        .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
-                .create(Datatypes.Float, DISTANCE_RUNTIME_NAME)
-                .semanticType(SO.NUMBER)
-                
.measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer";))
-                .build())
-            )
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        HaversineDistanceCalculatorProcessor::new,
+        ProcessingElementBuilder.create(
+                                    
"org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversine",
 0)
+                                .category(DataProcessorType.GEO)
+                                .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+                                .withLocales(Locales.EN)
+                                .requiredStream(StreamRequirementsBuilder
+                                                    .create()
+                                                    
.requiredPropertyWithUnaryMapping(
+                                                        
EpRequirements.semanticTypeReq(Geo.LAT),
+                                                        
Labels.withId(LAT_1_KEY), PropertyScope.MEASUREMENT_PROPERTY
+                                                    )
+                                                    
.requiredPropertyWithUnaryMapping(
+                                                        
EpRequirements.semanticTypeReq(Geo.LNG),
+                                                        
Labels.withId(LONG_1_KEY), PropertyScope.MEASUREMENT_PROPERTY
+                                                    )
+                                                    
.requiredPropertyWithUnaryMapping(
+                                                        
EpRequirements.semanticTypeReq(Geo.LAT),
+                                                        
Labels.withId(LAT_2_KEY), PropertyScope.MEASUREMENT_PROPERTY
+                                                    )
+                                                    
.requiredPropertyWithUnaryMapping(
+                                                        
EpRequirements.semanticTypeReq(Geo.LNG),
+                                                        
Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY
+                                                    )
+                                                    .build()
+                                )
+                                
.outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
+                                                                            
.create(
+                                                                               
 Datatypes.Float,
+                                                                               
 DISTANCE_RUNTIME_NAME
+                                                                            )
+                                                                            
.semanticType(SO.NUMBER)
+                                                                            
.measurementUnit(URI.create(
+                                                                               
 "http://qudt.org/vocab/unit#Kilometer";))
+                                                                            
.build())
+                                )
+                                .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-
-    this.lat1FieldMapper = 
parameters.extractor().mappingPropertyValue(LAT_1_KEY);
-    this.long1FieldMapper = 
parameters.extractor().mappingPropertyValue(LONG_1_KEY);
-    this.lat2FieldMapper = 
parameters.extractor().mappingPropertyValue(LAT_2_KEY);
-    this.long2FieldMapper = 
parameters.extractor().mappingPropertyValue(LONG_2_KEY);
-
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.lat1FieldMapper = params.extractor()
+                                 .mappingPropertyValue(LAT_1_KEY);
+    this.long1FieldMapper = params.extractor()
+                                  .mappingPropertyValue(LONG_1_KEY);
+    this.lat2FieldMapper = params.extractor()
+                                 .mappingPropertyValue(LAT_2_KEY);
+    this.long2FieldMapper = params.extractor()
+                                  .mappingPropertyValue(LONG_2_KEY);
   }
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
 
-    float lat1 = 
event.getFieldBySelector(lat1FieldMapper).getAsPrimitive().getAsFloat();
-    float long1 = 
event.getFieldBySelector(long1FieldMapper).getAsPrimitive().getAsFloat();
-    float lat2 = 
event.getFieldBySelector(lat2FieldMapper).getAsPrimitive().getAsFloat();
-    float long2 = 
event.getFieldBySelector(long2FieldMapper).getAsPrimitive().getAsFloat();
+    float lat1 = event.getFieldBySelector(lat1FieldMapper)
+                      .getAsPrimitive()
+                      .getAsFloat();
+    float long1 = event.getFieldBySelector(long1FieldMapper)
+                       .getAsPrimitive()
+                       .getAsFloat();
+    float lat2 = event.getFieldBySelector(lat2FieldMapper)
+                      .getAsPrimitive()
+                      .getAsFloat();
+    float long2 = event.getFieldBySelector(long2FieldMapper)
+                       .getAsPrimitive()
+                       .getAsFloat();
 
     double resultDist = HaversineDistanceUtil.dist(lat1, long1, lat2, long2);
 
@@ -108,7 +137,6 @@ public class HaversineDistanceCalculatorProcessor extends 
StreamPipesDataProcess
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversinestatic/HaversineStaticDistanceCalculatorProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversinestatic/HaversineStaticDistanceCalculatorProcessor.java
index 187dce6633..a26cf0e6cb 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversinestatic/HaversineStaticDistanceCalculatorProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversinestatic/HaversineStaticDistanceCalculatorProcessor.java
@@ -19,17 +19,20 @@
 package 
org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversinestatic;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
@@ -37,12 +40,10 @@ import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Datatypes;
 import org.apache.streampipes.vocabulary.Geo;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.net.URI;
 
-public class HaversineStaticDistanceCalculatorProcessor extends 
StreamPipesDataProcessor {
+public class HaversineStaticDistanceCalculatorProcessor implements 
IStreamPipesDataProcessor {
 
   private static final String LATITUDE_KEY = "latitude-key";
   private static final String LONGITUDE_KEY = "longitude-key";
@@ -56,59 +57,77 @@ public class HaversineStaticDistanceCalculatorProcessor 
extends StreamPipesDataP
   Float selectedLongitude;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create(
-            
"org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversinestatic",
 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.semanticTypeReq(Geo.LAT),
-                Labels.withId(LATITUDE_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.semanticTypeReq(Geo.LNG),
-                Labels.withId(LONGITUDE_KEY),
-                PropertyScope.MEASUREMENT_PROPERTY)
-            .build()
-        )
-        .requiredFloatParameter(Labels.withId(SELECTED_LATITUDE_KEY))
-        .requiredFloatParameter(Labels.withId(SELECTED_LONGITUDE_KEY))
-        .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
-            .create(Datatypes.Float, DISTANCE_RUNTIME_NAME)
-            .semanticType(SO.NUMBER)
-            
.measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer";))
-            .build())
-        )
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        HaversineStaticDistanceCalculatorProcessor::new,
+        ProcessingElementBuilder.create(
+                                    
"org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversinestatic",
 0)
+                                .category(DataProcessorType.GEO)
+                                .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+                                .withLocales(Locales.EN)
+                                .requiredStream(StreamRequirementsBuilder
+                                                    .create()
+                                                    
.requiredPropertyWithUnaryMapping(
+                                                        
EpRequirements.semanticTypeReq(Geo.LAT),
+                                                        
Labels.withId(LATITUDE_KEY),
+                                                        
PropertyScope.MEASUREMENT_PROPERTY
+                                                    )
+                                                    
.requiredPropertyWithUnaryMapping(
+                                                        
EpRequirements.semanticTypeReq(Geo.LNG),
+                                                        
Labels.withId(LONGITUDE_KEY),
+                                                        
PropertyScope.MEASUREMENT_PROPERTY
+                                                    )
+                                                    .build()
+                                )
+                                
.requiredFloatParameter(Labels.withId(SELECTED_LATITUDE_KEY))
+                                
.requiredFloatParameter(Labels.withId(SELECTED_LONGITUDE_KEY))
+                                
.outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
+                                                                            
.create(
+                                                                               
 Datatypes.Float,
+                                                                               
 DISTANCE_RUNTIME_NAME
+                                                                            )
+                                                                            
.semanticType(SO.NUMBER)
+                                                                            
.measurementUnit(URI.create(
+                                                                               
 "http://qudt.org/vocab/unit#Kilometer";))
+                                                                            
.build())
+                                )
+                                .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    this.latitudeFieldMapper = 
parameters.extractor().mappingPropertyValue(LATITUDE_KEY);
-    this.longitudeFieldMapper = 
parameters.extractor().mappingPropertyValue(LONGITUDE_KEY);
-    this.selectedLatitude = 
parameters.extractor().singleValueParameter(SELECTED_LATITUDE_KEY, Float.class);
-    this.selectedLongitude = 
parameters.extractor().singleValueParameter(SELECTED_LONGITUDE_KEY, 
Float.class);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.latitudeFieldMapper = params.extractor()
+                                     .mappingPropertyValue(LATITUDE_KEY);
+    this.longitudeFieldMapper = params.extractor()
+                                      .mappingPropertyValue(LONGITUDE_KEY);
+    this.selectedLatitude = params.extractor()
+                                  .singleValueParameter(SELECTED_LATITUDE_KEY, 
Float.class);
+    this.selectedLongitude = params.extractor()
+                                   
.singleValueParameter(SELECTED_LONGITUDE_KEY, Float.class);
   }
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    Float latitude = 
event.getFieldBySelector(latitudeFieldMapper).getAsPrimitive().getAsFloat();
-    Float longitude = 
event.getFieldBySelector(longitudeFieldMapper).getAsPrimitive().getAsFloat();
+    Float latitude = event.getFieldBySelector(latitudeFieldMapper)
+                          .getAsPrimitive()
+                          .getAsFloat();
+    Float longitude = event.getFieldBySelector(longitudeFieldMapper)
+                           .getAsPrimitive()
+                           .getAsFloat();
 
     Float distance = HaversineDistanceUtil.dist(latitude, longitude, 
selectedLatitude, selectedLongitude);
 
     event.addField(DISTANCE_RUNTIME_NAME, distance);
 
     collector.collect(event);
-
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemaps/GoogleMapsGeocoderProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemaps/GoogleMapsGeocoderProcessor.java
index b42df348fa..37ebf68d1e 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemaps/GoogleMapsGeocoderProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemaps/GoogleMapsGeocoderProcessor.java
@@ -19,24 +19,25 @@
 package 
org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemaps;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.processors.geo.jvm.config.ConfigKeys;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.Geo;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import com.google.maps.GeoApiContext;
 import com.google.maps.GeocodingApi;
@@ -45,7 +46,7 @@ import com.google.maps.model.GeocodingResult;
 
 import java.io.IOException;
 
-public class GoogleMapsGeocoderProcessor extends StreamPipesDataProcessor {
+public class GoogleMapsGeocoderProcessor implements IStreamPipesDataProcessor {
   private static final String GEOCODER_REQUEST_KEY = "geocoder-request-key";
   private static final String LAT_RUNTIME_NAME = "geocoder-latitude";
   private static final String LONG_RUNTIME_NAME = "geocoder-longitude";
@@ -53,33 +54,43 @@ public class GoogleMapsGeocoderProcessor extends 
StreamPipesDataProcessor {
   private GeoApiContext apiContext;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemaps",
 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(),
-                Labels.withId(GEOCODER_REQUEST_KEY),
-                PropertyScope.NONE)
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        GoogleMapsGeocoderProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemaps",
 0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.stringReq(),
+                                    Labels.withId(GEOCODER_REQUEST_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .build()
+            )
+            .outputStrategy(OutputStrategies.append(
+                                EpProperties.doubleEp(Labels.empty(), 
LAT_RUNTIME_NAME, Geo.LAT),
+                                EpProperties.stringEp(Labels.empty(), 
LONG_RUNTIME_NAME, Geo.LNG)
+                            )
+            )
             .build()
-        )
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.doubleEp(Labels.empty(), LAT_RUNTIME_NAME, Geo.LAT),
-            EpProperties.stringEp(Labels.empty(), LONG_RUNTIME_NAME, Geo.LNG))
-        )
-        .build();
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.geocoderRequest = params.extractor()
+                                 .mappingPropertyValue(GEOCODER_REQUEST_KEY);
 
-    this.geocoderRequest = 
parameters.extractor().mappingPropertyValue(GEOCODER_REQUEST_KEY);
-
-    String googleMapsApiKey = 
runtimeContext.getConfigStore().getString(ConfigKeys.GOOGLE_API_KEY);
+    String googleMapsApiKey = runtimeContext.getConfigStore()
+                                            
.getString(ConfigKeys.GOOGLE_API_KEY);
 
     if (googleMapsApiKey == null || googleMapsApiKey.equals("")) {
       throw new SpRuntimeException("Could not start Geocoder. Did you forget 
to add a Google Maps API key?");
@@ -88,18 +99,17 @@ public class GoogleMapsGeocoderProcessor extends 
StreamPipesDataProcessor {
           .apiKey(googleMapsApiKey)
           .build();
     }
-    // TODO: RequestDeniedException
-    //  You must enable Billing on the Google Cloud Project at
-    //  https://console.cloud.google.com/project/_/billing/enable
-    //  Learn more at https://developers.google.com/maps/gmp
   }
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    String request = 
event.getFieldBySelector(geocoderRequest).getAsPrimitive().getAsString();
+    String request = event.getFieldBySelector(geocoderRequest)
+                          .getAsPrimitive()
+                          .getAsString();
 
     try {
-      GeocodingResult[] results = GeocodingApi.geocode(apiContext, 
request).await();
+      GeocodingResult[] results = GeocodingApi.geocode(apiContext, request)
+                                              .await();
 
       Double latitude = results[0].geometry.location.lat;
       Double longitude = results[0].geometry.location.lng;
@@ -116,7 +126,6 @@ public class GoogleMapsGeocoderProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemapsstatic/GoogleMapsStaticGeocoderProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemapsstatic/GoogleMapsStaticGeocoderProcessor.java
index 83d2de7ae1..13a4a9280e 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemapsstatic/GoogleMapsStaticGeocoderProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemapsstatic/GoogleMapsStaticGeocoderProcessor.java
@@ -19,23 +19,24 @@
 package 
org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemapsstatic;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.geo.jvm.config.ConfigKeys;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.Geo;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import com.google.maps.GeoApiContext;
 import com.google.maps.GeocodingApi;
@@ -44,7 +45,7 @@ import com.google.maps.model.GeocodingResult;
 
 import java.io.IOException;
 
-public class GoogleMapsStaticGeocoderProcessor extends 
StreamPipesDataProcessor {
+public class GoogleMapsStaticGeocoderProcessor implements 
IStreamPipesDataProcessor {
   private static final String STATIC_GEOCODER_REQUEST_KEY = "sg-request-key";
   private static final String LAT_RUNTIME_NAME = "staticgeocoder-latitude";
   private static final String LONG_RUNTIME_NAME = "staticgeocoder-longitude";
@@ -54,49 +55,57 @@ public class GoogleMapsStaticGeocoderProcessor extends 
StreamPipesDataProcessor
   private Double responseLongitude;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemapsstatic",
 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(
-            StreamRequirementsBuilder
-                .create()
-                .requiredProperty(EpRequirements.anyProperty())
-                .build()
-        )
-        .requiredTextParameter(Labels.withId(STATIC_GEOCODER_REQUEST_KEY))
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.doubleEp(Labels.empty(), LAT_RUNTIME_NAME, Geo.LAT),
-            EpProperties.stringEp(Labels.empty(), LONG_RUNTIME_NAME, Geo.LNG))
-        )
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        GoogleMapsStaticGeocoderProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemapsstatic",
 0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(
+                StreamRequirementsBuilder
+                    .create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build()
+            )
+            .requiredTextParameter(Labels.withId(STATIC_GEOCODER_REQUEST_KEY))
+            .outputStrategy(OutputStrategies.append(
+                                EpProperties.doubleEp(Labels.empty(), 
LAT_RUNTIME_NAME, Geo.LAT),
+                                EpProperties.stringEp(Labels.empty(), 
LONG_RUNTIME_NAME, Geo.LNG)
+                            )
+            )
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-
-    this.staticGeocoderRequest =
-        
parameters.extractor().singleValueParameter(STATIC_GEOCODER_REQUEST_KEY, 
String.class);
-    String googleMapsApiKey = 
runtimeContext.getConfigStore().getString(ConfigKeys.GOOGLE_API_KEY);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.staticGeocoderRequest = params.extractor()
+                                       
.singleValueParameter(STATIC_GEOCODER_REQUEST_KEY, String.class);
+    String googleMapsApiKey = runtimeContext.getConfigStore()
+                                            
.getString(ConfigKeys.GOOGLE_API_KEY);
 
     if (googleMapsApiKey == null || googleMapsApiKey.equals("")) {
-      throw new SpRuntimeException("Could not start Geocoder. Did you forget 
to add a Google Maps" + " API key?");
+      throw new SpRuntimeException("Could not start Geocoder. Did you forget 
to add a Google Maps API key?");
     }
 
-    this.apiContext = new 
GeoApiContext.Builder().apiKey(googleMapsApiKey).build();
+    this.apiContext = new GeoApiContext.Builder().apiKey(googleMapsApiKey)
+                                                 .build();
 
     try {
-      GeocodingResult[] results = GeocodingApi.geocode(apiContext, 
staticGeocoderRequest).await();
+      GeocodingResult[] results = GeocodingApi.geocode(apiContext, 
staticGeocoderRequest)
+                                              .await();
       this.responseLatitude = results[0].geometry.location.lat;
       this.responseLongitude = results[0].geometry.location.lng;
     } catch (ApiException | IOException | InterruptedException e) {
       e.printStackTrace();
       throw new SpRuntimeException("Could not fetch geocoding result");
     }
-
   }
 
   @Override
@@ -105,11 +114,9 @@ public class GoogleMapsStaticGeocoderProcessor extends 
StreamPipesDataProcessor
     event.addField(LONG_RUNTIME_NAME, responseLongitude);
 
     collector.collect(event);
-
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/revgeocoder/geocityname/GeoCityNameRevdecodeProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/revgeocoder/geocityname/GeoCityNameRevdecodeProcessor.java
index 67c40690b7..8e0d491fe2 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/revgeocoder/geocityname/GeoCityNameRevdecodeProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/revgeocoder/geocityname/GeoCityNameRevdecodeProcessor.java
@@ -19,25 +19,26 @@
 package 
org.apache.streampipes.processors.geo.jvm.latlong.processor.revgeocoder.geocityname;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.geo.jvm.latlong.processor.revgeocoder.geocityname.geocode.GeoName;
 import 
org.apache.streampipes.processors.geo.jvm.latlong.processor.revgeocoder.geocityname.geocode.ReverseGeoCode;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.Geo;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.apache.http.client.fluent.Request;
 
@@ -46,7 +47,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.zip.ZipInputStream;
 
-public class GeoCityNameRevdecodeProcessor extends StreamPipesDataProcessor {
+public class GeoCityNameRevdecodeProcessor implements 
IStreamPipesDataProcessor {
   private static final String LATITUDE_MAPPING_KEY = "latitude-mapping-key";
   private static final String LONGITUDE_MAPPING_KEY = "longitude-mapping-key";
   private static final String GEONAME_RUNTIME_NAME = "geoname";
@@ -57,38 +58,49 @@ public class GeoCityNameRevdecodeProcessor extends 
StreamPipesDataProcessor {
   private ReverseGeoCode reverseGeoCode;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.revgeocoder.geocityname",
 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LAT),
-                Labels.withId(LATITUDE_MAPPING_KEY),
-                PropertyScope.NONE)
-            
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LNG),
-                Labels.withId(LONGITUDE_MAPPING_KEY),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.stringEp(Labels.empty(), GEONAME_RUNTIME_NAME, 
"http://schema.org/city";)
-        ))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        GeoCityNameRevdecodeProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.revgeocoder.geocityname",
 0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.semanticTypeReq(Geo.LAT),
+                                    Labels.withId(LATITUDE_MAPPING_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.semanticTypeReq(Geo.LNG),
+                                    Labels.withId(LONGITUDE_MAPPING_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.append(
+                EpProperties.stringEp(Labels.empty(), GEONAME_RUNTIME_NAME, 
"http://schema.org/city";)
+            ))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    this.latitudeFieldMapper = 
parameters.extractor().mappingPropertyValue(LATITUDE_MAPPING_KEY);
-    this.longitudeFieldMapper = 
parameters.extractor().mappingPropertyValue(LONGITUDE_MAPPING_KEY);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.latitudeFieldMapper = params.extractor()
+                                     
.mappingPropertyValue(LATITUDE_MAPPING_KEY);
+    this.longitudeFieldMapper = params.extractor()
+                                      
.mappingPropertyValue(LONGITUDE_MAPPING_KEY);
 
     try {
       InputStream stream = downloadCitiesDataSet();
       if (stream != null) {
-        ZipInputStream zipInputStream = null;
-        zipInputStream = new ZipInputStream(stream);
+        ZipInputStream zipInputStream = new ZipInputStream(stream);
         this.reverseGeoCode = new ReverseGeoCode(zipInputStream, false);
       }
     } catch (IOException e) {
@@ -99,8 +111,12 @@ public class GeoCityNameRevdecodeProcessor extends 
StreamPipesDataProcessor {
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
 
-    Double latitude = 
event.getFieldBySelector(latitudeFieldMapper).getAsPrimitive().getAsDouble();
-    Double longitude = 
event.getFieldBySelector(longitudeFieldMapper).getAsPrimitive().getAsDouble();
+    Double latitude = event.getFieldBySelector(latitudeFieldMapper)
+                           .getAsPrimitive()
+                           .getAsDouble();
+    Double longitude = event.getFieldBySelector(longitudeFieldMapper)
+                            .getAsPrimitive()
+                            .getAsDouble();
 
     GeoName geoName = this.reverseGeoCode.nearestPlace(latitude, longitude);
 
@@ -109,12 +125,14 @@ public class GeoCityNameRevdecodeProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 
   private InputStream downloadCitiesDataSet() throws IOException {
-    byte[] citiesDataset = 
Request.Get(CITIES_DATASET_URL).execute().returnContent().asBytes();
+    byte[] citiesDataset = Request.Get(CITIES_DATASET_URL)
+                                  .execute()
+                                  .returnContent()
+                                  .asBytes();
     return new ByteArrayInputStream(citiesDataset);
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
index a67f3bfd64..3165e66f5f 100644
--- 
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
@@ -19,17 +19,20 @@
 package 
org.apache.streampipes.processors.geo.jvm.latlong.processor.speedcalculator;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
@@ -37,14 +40,12 @@ import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Datatypes;
 import org.apache.streampipes.vocabulary.Geo;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.apache.commons.collections.buffer.CircularFifoBuffer;
 
 import java.net.URI;
 
-public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
+public class SpeedCalculatorProcessor implements IStreamPipesDataProcessor {
   private static final String TIMESTAMP_KEY = "timestamp-key";
   private static final String LATITUDE_KEY = "latitude-key";
   private static final String LONGITUDE_KEY = "longitude-key";
@@ -57,39 +58,55 @@ public class SpeedCalculatorProcessor extends 
StreamPipesDataProcessor {
   private CircularFifoBuffer buffer;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.speedcalculator",
 0)
-        .category(DataProcessorType.GEO)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(),
-                Labels.withId(TIMESTAMP_KEY), PropertyScope.HEADER_PROPERTY)
-            
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LAT)
-                , Labels.withId(LATITUDE_KEY), 
PropertyScope.MEASUREMENT_PROPERTY)
-            
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LNG)
-                , Labels.withId(LONGITUDE_KEY), 
PropertyScope.MEASUREMENT_PROPERTY)
-            .build())
-        .requiredIntegerParameter(Labels.withId(COUNT_WINDOW_KEY))
-        .outputStrategy(
-            OutputStrategies.append(PrimitivePropertyBuilder
-                .create(Datatypes.Float, SPEED_RUNTIME_NAME)
-                .semanticType(SO.NUMBER)
-                
.measurementUnit(URI.create("http://qudt.org/vocab/unit#KilometerPerHour";))
-                .build())
-        )
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        SpeedCalculatorProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.speedcalculator",
 0)
+            .category(DataProcessorType.GEO)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.timestampReq(),
+                                    Labels.withId(TIMESTAMP_KEY), 
PropertyScope.HEADER_PROPERTY
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.semanticTypeReq(Geo.LAT),
+                                    Labels.withId(LATITUDE_KEY), 
PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.semanticTypeReq(Geo.LNG),
+                                    Labels.withId(LONGITUDE_KEY), 
PropertyScope.MEASUREMENT_PROPERTY
+                                )
+                                .build())
+            .requiredIntegerParameter(Labels.withId(COUNT_WINDOW_KEY))
+            .outputStrategy(
+                OutputStrategies.append(PrimitivePropertyBuilder
+                                            .create(Datatypes.Float, 
SPEED_RUNTIME_NAME)
+                                            .semanticType(SO.NUMBER)
+                                            
.measurementUnit(URI.create("http://qudt.org/vocab/unit#KilometerPerHour";))
+                                            .build())
+            )
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    this.latitudeFieldMapper = 
parameters.extractor().mappingPropertyValue(LATITUDE_KEY);
-    this.longitudeFieldMapper = 
parameters.extractor().mappingPropertyValue(LONGITUDE_KEY);
-    this.timestampFieldMapper = 
parameters.extractor().mappingPropertyValue(TIMESTAMP_KEY);
-    this.countWindowSize = 
parameters.extractor().singleValueParameter(COUNT_WINDOW_KEY, Integer.class);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
+    this.latitudeFieldMapper = params.extractor()
+                                     .mappingPropertyValue(LATITUDE_KEY);
+    this.longitudeFieldMapper = params.extractor()
+                                      .mappingPropertyValue(LONGITUDE_KEY);
+    this.timestampFieldMapper = params.extractor()
+                                      .mappingPropertyValue(TIMESTAMP_KEY);
+    this.countWindowSize = params.extractor()
+                                 .singleValueParameter(COUNT_WINDOW_KEY, 
Integer.class);
     this.buffer = new CircularFifoBuffer(countWindowSize);
   }
 
@@ -105,8 +122,7 @@ public class SpeedCalculatorProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 
   private double calculateSpeed(Event firstEvent, Event currentEvent) {
@@ -118,8 +134,10 @@ public class SpeedCalculatorProcessor extends 
StreamPipesDataProcessor {
     Float currentLongitude = getFloat(currentEvent, this.longitudeFieldMapper);
     Long currentTimestamp = getLong(currentEvent, this.timestampFieldMapper);
 
-    Float distanceInKm = HaversineDistanceUtil.dist(firstLatitude, 
firstLongitude, currentLatitude,
-        currentLongitude);
+    Float distanceInKm = HaversineDistanceUtil.dist(
+        firstLatitude, firstLongitude, currentLatitude,
+        currentLongitude
+    );
 
     Double durationInSeconds = Double.valueOf((currentTimestamp - 
firstTimestamp) / 1000.0);
 
@@ -130,10 +148,14 @@ public class SpeedCalculatorProcessor extends 
StreamPipesDataProcessor {
   }
 
   private Long getLong(Event event, String fieldName) {
-    return event.getFieldBySelector(fieldName).getAsPrimitive().getAsLong();
+    return event.getFieldBySelector(fieldName)
+                .getAsPrimitive()
+                .getAsLong();
   }
 
   private Float getFloat(Event event, String fieldName) {
-    return event.getFieldBySelector(fieldName).getAsPrimitive().getAsFloat();
+    return event.getFieldBySelector(fieldName)
+                .getAsPrimitive()
+                .getAsFloat();
   }
 }

Reply via email to