This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new d0294a79ae refactor(#3652): Update processors to implement
IStreamPipesDataProcessor interface (#3664)
d0294a79ae is described below
commit d0294a79aeacd15319e863efcf30c31e241dc46a
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Jul 11 09:02:28 2025 +0200
refactor(#3652): Update processors to implement IStreamPipesDataProcessor
interface (#3664)
---
.../buffergeometry/BufferGeomProcessor.java | 195 ++++++++++++---------
.../bufferpoint/BufferPointProcessor.java | 145 ++++++++-------
.../geo/jvm/jts/processor/epsg/EpsgProcessor.java | 56 +++---
.../LatLngToJtsPointProcessor.java | 101 ++++++-----
.../reprojection/ReprojectionProcessor.java | 90 ++++++----
.../trajectory/TrajectoryFromPointsProcessor.java | 148 +++++++++-------
.../complex/TopologyValidationProcessor.java | 116 +++++++-----
.../simple/GeometryValidationProcessor.java | 115 ++++++------
.../HaversineDistanceCalculatorProcessor.java | 114 +++++++-----
...HaversineStaticDistanceCalculatorProcessor.java | 103 ++++++-----
.../googlemaps/GoogleMapsGeocoderProcessor.java | 77 ++++----
.../GoogleMapsStaticGeocoderProcessor.java | 77 ++++----
.../geocityname/GeoCityNameRevdecodeProcessor.java | 86 +++++----
.../speedcalculator/SpeedCalculatorProcessor.java | 102 ++++++-----
14 files changed, 890 insertions(+), 635 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/buffergeometry/BufferGeomProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/buffergeometry/BufferGeomProcessor.java
index 99c9dea163..bf01036df5 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/buffergeometry/BufferGeomProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/buffergeometry/BufferGeomProcessor.java
@@ -18,11 +18,13 @@
package org.apache.streampipes.processors.geo.jvm.jts.processor.buffergeometry;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
@@ -33,21 +35,20 @@ import
org.apache.streampipes.processors.geo.jvm.jts.helper.buffer.JoinStyle;
import
org.apache.streampipes.processors.geo.jvm.jts.helper.buffer.SpBufferBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.locationtech.jts.geom.Geometry;
import org.opengis.util.FactoryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BufferGeomProcessor extends StreamPipesDataProcessor {
+public class BufferGeomProcessor implements IStreamPipesDataProcessor {
public static final String GEOM_KEY = "geometry-key";
public static final String EPSG_KEY = "epsg-key";
public static final String CAP_KEY = "cap-style-key";
@@ -72,80 +73,92 @@ public class BufferGeomProcessor extends
StreamPipesDataProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(BufferGeomProcessor.class);
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.buffergeometry",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry"),
- Labels.withId(GEOM_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
- Labels.withId(EPSG_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .outputStrategy(OutputStrategies.append(
- EpProperties.stringEp(
- Labels.withId(GEOM_KEY),
- GEOM_RUNTIME,
- "http://www.opengis.net/ont/geosparql#Geometry"
- ),
- EpProperties.numberEp(
- Labels.withId(EPSG_KEY),
- EPSG_RUNTIME,
- "http://data.ign.fr/def/ignf#CartesianCS"
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ BufferGeomProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.buffergeometry",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry"),
+ Labels.withId(GEOM_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
+ Labels.withId(EPSG_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .build())
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(
+ Labels.withId(GEOM_KEY),
+ GEOM_RUNTIME,
+
"http://www.opengis.net/ont/geosparql#Geometry"
+ ),
+ EpProperties.numberEp(
+ Labels.withId(EPSG_KEY),
+ EPSG_RUNTIME,
+ "http://data.ign.fr/def/ignf#CartesianCS"
+ )
+ )
+ )
+ .requiredSingleValueSelection(
+ Labels.withId(CAP_KEY),
+ Options.from(
+ CapStyle.Square.name(),
+ CapStyle.Flat.name(),
+ CapStyle.Round.name()
+ )
+ )
+ .requiredSingleValueSelection(
+ Labels.withId(JOIN_KEY),
+ Options.from(
+ JoinStyle.Bevel.name(),
+ JoinStyle.Mitre.name(),
+ JoinStyle.Round.name()
+ )
+ )
+ .requiredSingleValueSelection(
+ Labels.withId(SIDE_KEY),
+ Options.from(
+ BufferSide.Both.name(),
+ BufferSide.Left.name(),
+ BufferSide.Right.name()
)
)
- )
- .requiredSingleValueSelection(
- Labels.withId(CAP_KEY),
- Options.from(
- CapStyle.Square.name(),
- CapStyle.Flat.name(),
- CapStyle.Round.name())
- )
- .requiredSingleValueSelection(
- Labels.withId(JOIN_KEY),
- Options.from(
- JoinStyle.Bevel.name(),
- JoinStyle.Mitre.name(),
- JoinStyle.Round.name())
- )
- .requiredSingleValueSelection(
- Labels.withId(SIDE_KEY),
- Options.from(
- BufferSide.Both.name(),
- BufferSide.Left.name(),
- BufferSide.Right.name())
- )
- .requiredIntegerParameter(
- Labels.withId(MITRE_LIMIT_KEY),
- 5)
- .requiredIntegerParameter(
- Labels.withId(SEGMENTS_KEY),
- 8
- )
- .requiredFloatParameter(
- Labels.withId(SIMPLIFY_FACTOR_KEY),
- 0.01f
- )
- .requiredFloatParameter(
- Labels.withId(DISTANCE_KEY)
- )
- .build();
+ .requiredIntegerParameter(
+ Labels.withId(MITRE_LIMIT_KEY),
+ 5
+ )
+ .requiredIntegerParameter(
+ Labels.withId(SEGMENTS_KEY),
+ 8
+ )
+ .requiredFloatParameter(
+ Labels.withId(SIMPLIFY_FACTOR_KEY),
+ 0.01f
+ )
+ .requiredFloatParameter(
+ Labels.withId(DISTANCE_KEY)
+ )
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
try {
- if (SpReprojectionBuilder.isSisConfigurationValid()){
+ if (SpReprojectionBuilder.isSisConfigurationValid()) {
LOG.info("SIS DB Settings successful checked ");
} else {
LOG.warn("The required EPSG database is not imported");
@@ -155,15 +168,24 @@ public class BufferGeomProcessor extends
StreamPipesDataProcessor {
throw new SpRuntimeException("Something unexpected happened " + e);
}
- this.geometryMapper =
parameters.extractor().mappingPropertyValue(GEOM_KEY);
- this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
- String readCapStyle = parameters.extractor().selectedSingleValue(CAP_KEY,
String.class);
- String readJoinStyle =
parameters.extractor().selectedSingleValue(JOIN_KEY, String.class);
- String readSide = parameters.extractor().selectedSingleValue(SIDE_KEY,
String.class);
- this.mitreLimit =
parameters.extractor().singleValueParameter(MITRE_LIMIT_KEY, Double.class);
- this.segments = parameters.extractor().singleValueParameter(SEGMENTS_KEY,
Integer.class);
- this.simplifyFactor =
parameters.extractor().singleValueParameter(SIMPLIFY_FACTOR_KEY, Double.class);
- this.distance = parameters.extractor().singleValueParameter(DISTANCE_KEY,
Double.class);
+ this.geometryMapper = params.extractor()
+ .mappingPropertyValue(GEOM_KEY);
+ this.epsgMapper = params.extractor()
+ .mappingPropertyValue(EPSG_KEY);
+ String readCapStyle = params.extractor()
+ .selectedSingleValue(CAP_KEY, String.class);
+ String readJoinStyle = params.extractor()
+ .selectedSingleValue(JOIN_KEY, String.class);
+ String readSide = params.extractor()
+ .selectedSingleValue(SIDE_KEY, String.class);
+ this.mitreLimit = params.extractor()
+ .singleValueParameter(MITRE_LIMIT_KEY,
Double.class);
+ this.segments = params.extractor()
+ .singleValueParameter(SEGMENTS_KEY, Integer.class);
+ this.simplifyFactor = params.extractor()
+ .singleValueParameter(SIMPLIFY_FACTOR_KEY,
Double.class);
+ this.distance = params.extractor()
+ .singleValueParameter(DISTANCE_KEY, Double.class);
// transform names to numbers
this.capStyle = 1;
if (readCapStyle.equals(CapStyle.Square.name())) {
@@ -192,8 +214,12 @@ public class BufferGeomProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- String geom =
event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
- Integer epsg =
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+ String geom = event.getFieldBySelector(geometryMapper)
+ .getAsPrimitive()
+ .getAsString();
+ Integer epsg = event.getFieldBySelector(epsgMapper)
+ .getAsPrimitive()
+ .getAsInt();
Geometry geometry = SpGeometryBuilder.createSPGeom(geom, epsg);
Geometry bufferGeom =
SpBufferBuilder.createSpBuffer(
@@ -205,7 +231,8 @@ public class BufferGeomProcessor extends
StreamPipesDataProcessor {
segments,
simplifyFactor,
singleSided,
- side);
+ side
+ );
if (!bufferGeom.isEmpty()) {
event.addField(GEOM_RUNTIME, bufferGeom.toText());
@@ -220,6 +247,6 @@ public class BufferGeomProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/bufferpoint/BufferPointProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/bufferpoint/BufferPointProcessor.java
index 25a4aff818..c4ca305dd0 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/bufferpoint/BufferPointProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/bufferpoint/BufferPointProcessor.java
@@ -18,11 +18,13 @@
package org.apache.streampipes.processors.geo.jvm.jts.processor.bufferpoint;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
@@ -31,14 +33,13 @@ import
org.apache.streampipes.processors.geo.jvm.jts.helper.buffer.CapStyle;
import
org.apache.streampipes.processors.geo.jvm.jts.helper.buffer.SpBufferBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.Point;
@@ -47,7 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BufferPointProcessor extends StreamPipesDataProcessor {
+public class BufferPointProcessor implements IStreamPipesDataProcessor {
public static final String GEOM_KEY = "geometry-key";
public static final String EPSG_KEY = "epsg-key";
public static final String CAP_KEY = "cap-style-key";
@@ -65,62 +66,71 @@ public class BufferPointProcessor extends
StreamPipesDataProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(BufferPointProcessor.class);
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.bufferpoint",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON,
"output.png")
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry"),
- Labels.withId(GEOM_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
- Labels.withId(EPSG_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .outputStrategy(OutputStrategies.append(
- EpProperties.stringEp(
- Labels.withId(GEOM_KEY),
- GEOM_RUNTIME,
- "http://www.opengis.net/ont/geosparql#Geometry"
- ),
- EpProperties.numberEp(
- Labels.withId(EPSG_KEY),
- EPSG_RUNTIME,
- "http://data.ign.fr/def/ignf#CartesianCS"
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ BufferPointProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.bufferpoint",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON, "output.png")
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry"),
+ Labels.withId(GEOM_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
+ Labels.withId(EPSG_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .build())
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(
+ Labels.withId(GEOM_KEY),
+ GEOM_RUNTIME,
+
"http://www.opengis.net/ont/geosparql#Geometry"
+ ),
+ EpProperties.numberEp(
+ Labels.withId(EPSG_KEY),
+ EPSG_RUNTIME,
+ "http://data.ign.fr/def/ignf#CartesianCS"
+ )
+ )
+ )
+ .requiredSingleValueSelection(
+ Labels.withId(CAP_KEY),
+ Options.from(
+ CapStyle.Square.name(),
+ CapStyle.Round.name()
)
)
- )
- .requiredSingleValueSelection(
- Labels.withId(CAP_KEY),
- Options.from(
- CapStyle.Square.name(),
- CapStyle.Round.name())
- )
- .requiredIntegerParameter(
- Labels.withId(SEGMENTS_KEY),
- 8
- )
- .requiredFloatParameter(
- Labels.withId(SIMPLIFY_FACTOR_KEY),
- 0.01f
- )
- .requiredFloatParameter(
- Labels.withId(DISTANCE_KEY)
- )
- .build();
+ .requiredIntegerParameter(
+ Labels.withId(SEGMENTS_KEY),
+ 8
+ )
+ .requiredFloatParameter(
+ Labels.withId(SIMPLIFY_FACTOR_KEY),
+ 0.01f
+ )
+ .requiredFloatParameter(
+ Labels.withId(DISTANCE_KEY)
+ )
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
try {
- if (SpReprojectionBuilder.isSisConfigurationValid()){
+ if (SpReprojectionBuilder.isSisConfigurationValid()) {
LOG.info("SIS DB Settings successful checked ");
} else {
LOG.warn("The required EPSG database is not imported");
@@ -130,12 +140,18 @@ public class BufferPointProcessor extends
StreamPipesDataProcessor {
throw new SpRuntimeException("Something unexpected happened " + e);
}
- this.geometryMapper =
parameters.extractor().mappingPropertyValue(GEOM_KEY);
- this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
- String readCapStyle = parameters.extractor().selectedSingleValue(CAP_KEY,
String.class);
- this.segments = parameters.extractor().singleValueParameter(SEGMENTS_KEY,
Integer.class);
- this.simplifyFactor =
parameters.extractor().singleValueParameter(SIMPLIFY_FACTOR_KEY, Double.class);
- this.distance = parameters.extractor().singleValueParameter(DISTANCE_KEY,
Double.class);
+ this.geometryMapper = params.extractor()
+ .mappingPropertyValue(GEOM_KEY);
+ this.epsgMapper = params.extractor()
+ .mappingPropertyValue(EPSG_KEY);
+ String readCapStyle = params.extractor()
+ .selectedSingleValue(CAP_KEY, String.class);
+ this.segments = params.extractor()
+ .singleValueParameter(SEGMENTS_KEY, Integer.class);
+ this.simplifyFactor = params.extractor()
+ .singleValueParameter(SIMPLIFY_FACTOR_KEY,
Double.class);
+ this.distance = params.extractor()
+ .singleValueParameter(DISTANCE_KEY, Double.class);
// transform names to numbers
this.capStyle = 1;
@@ -148,8 +164,12 @@ public class BufferPointProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- String geom =
event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
- Integer epsg =
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+ String geom = event.getFieldBySelector(geometryMapper)
+ .getAsPrimitive()
+ .getAsString();
+ Integer epsg = event.getFieldBySelector(epsgMapper)
+ .getAsPrimitive()
+ .getAsInt();
Geometry geometry = SpGeometryBuilder.createSPGeom(geom, epsg);
@@ -167,7 +187,8 @@ public class BufferPointProcessor extends
StreamPipesDataProcessor {
LOG.warn("Only points are supported but input type is " +
geometry.getGeometryType());
}
}
+
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
index 3126196d84..7a28f28db9 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
@@ -19,51 +19,59 @@
package org.apache.streampipes.processors.geo.jvm.jts.processor.epsg;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class EpsgProcessor extends StreamPipesDataProcessor {
+public class EpsgProcessor implements IStreamPipesDataProcessor {
private static final String EPSG_KEY = "epsg-key";
private static final String EPSG_RUNTIME = "epsg";
private int epsgCode;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.epsg", 0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .build())
- .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
- .create(Datatypes.Integer, EPSG_RUNTIME)
- .semanticType("http://data.ign.fr/def/ignf#CartesianCS")
- .build())
- )
- .requiredIntegerParameter(Labels.withId(EPSG_KEY), 4326)
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ EpsgProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.epsg", 0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .build())
+ .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
+
.create(Datatypes.Integer, EPSG_RUNTIME)
+
.semanticType("http://data.ign.fr/def/ignf#CartesianCS")
+ .build())
+ )
+ .requiredIntegerParameter(Labels.withId(EPSG_KEY), 4326)
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- this.epsgCode = parameters.extractor().singleValueParameter(EPSG_KEY,
Integer.class);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.epsgCode = params.extractor()
+ .singleValueParameter(EPSG_KEY, Integer.class);
}
@Override
@@ -73,7 +81,7 @@ public class EpsgProcessor extends StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
index 3117685f79..0a208459dc 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
@@ -19,31 +19,32 @@
package
org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.vocabulary.Geo;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.locationtech.jts.geom.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
+public class LatLngToJtsPointProcessor implements IStreamPipesDataProcessor {
private static final String LAT_KEY = "latitude-key";
private static final String LNG_KEY = "longitude-key";
private static final String EPSG_KEY = "epsg-key";
@@ -53,50 +54,69 @@ public class LatLngToJtsPointProcessor extends
StreamPipesDataProcessor {
private String epsgMapper;
private static final Logger LOG =
LoggerFactory.getLogger(LatLngToJtsPointProcessor.class);
public static final String EPA_NAME = "Latitude Longitude To JTS Point";
+
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(
- StreamRequirementsBuilder
- .create()
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LAT),
- Labels.withId(LAT_KEY), PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(
- EpRequirements.semanticTypeReq(Geo.LNG),
- Labels.withId(LNG_KEY), PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
- Labels.withId(EPSG_KEY),
PropertyScope.MEASUREMENT_PROPERTY)
- .build()
- )
- .outputStrategy(
- OutputStrategies.append(
- PrimitivePropertyBuilder
- .create(Datatypes.String, GEOMETRY_RUNTIME)
-
.semanticType("http://www.opengis.net/ont/geosparql#Geometry")
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ LatLngToJtsPointProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(
+ StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.semanticTypeReq(Geo.LAT),
+ Labels.withId(LAT_KEY),
PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.semanticTypeReq(Geo.LNG),
+ Labels.withId(LNG_KEY),
PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
+ Labels.withId(EPSG_KEY),
PropertyScope.MEASUREMENT_PROPERTY
+ )
.build()
)
- )
- .build();
+ .outputStrategy(
+ OutputStrategies.append(
+ PrimitivePropertyBuilder
+ .create(Datatypes.String, GEOMETRY_RUNTIME)
+
.semanticType("http://www.opengis.net/ont/geosparql#Geometry")
+ .build()
+ )
+ )
+ .build());
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- this.latitudeMapper = parameters.extractor().mappingPropertyValue(LAT_KEY);
- this.longitudeMapper =
parameters.extractor().mappingPropertyValue(LNG_KEY);
- this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.latitudeMapper = params.extractor()
+ .mappingPropertyValue(LAT_KEY);
+ this.longitudeMapper = params.extractor()
+ .mappingPropertyValue(LNG_KEY);
+ this.epsgMapper = params.extractor()
+ .mappingPropertyValue(EPSG_KEY);
}
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- Double lat =
event.getFieldBySelector(latitudeMapper).getAsPrimitive().getAsDouble();
- Double lng =
event.getFieldBySelector(longitudeMapper).getAsPrimitive().getAsDouble();
- Integer epsg =
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+ Double lat = event.getFieldBySelector(latitudeMapper)
+ .getAsPrimitive()
+ .getAsDouble();
+ Double lng = event.getFieldBySelector(longitudeMapper)
+ .getAsPrimitive()
+ .getAsDouble();
+ Integer epsg = event.getFieldBySelector(epsgMapper)
+ .getAsPrimitive()
+ .getAsInt();
Point geom = SpGeometryBuilder.createSPGeom(lng, lat, epsg);
@@ -107,13 +127,12 @@ public class LatLngToJtsPointProcessor extends
StreamPipesDataProcessor {
collector.collect(event);
} else {
LOG.warn("An empty point geometry in " + EPA_NAME + " is created due"
- + "invalid input field. Latitude: " + lat + "Longitude: " + lng);
+ + "invalid input field. Latitude: " + lat + "Longitude: " +
lng);
LOG.error("An event is filtered out because of invalid geometry");
}
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
index d1f8696732..78cb7297ed 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
@@ -19,11 +19,13 @@
package org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpNotSupportedGeometryException;
@@ -31,19 +33,18 @@ import
org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
import
org.apache.streampipes.processors.geo.jvm.jts.helper.SpReprojectionBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.locationtech.jts.geom.Geometry;
import org.opengis.util.FactoryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ReprojectionProcessor extends StreamPipesDataProcessor {
+public class ReprojectionProcessor implements IStreamPipesDataProcessor {
public static final String GEOM_KEY = "geom-key";
public static final String SOURCE_EPSG_KEY = "source-epsg-key";
public static final String TARGET_EPSG_KEY = "target-epsg-key";
@@ -55,39 +56,49 @@ public class ReprojectionProcessor extends
StreamPipesDataProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(ReprojectionProcessor.class);
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry"),
- Labels.withId(GEOM_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
- Labels.withId(SOURCE_EPSG_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .outputStrategy(OutputStrategies.keep())
- .requiredIntegerParameter(Labels.withId(TARGET_EPSG_KEY), 32632)
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ ReprojectionProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry"),
+ Labels.withId(GEOM_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
+ Labels.withId(SOURCE_EPSG_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .build())
+ .outputStrategy(OutputStrategies.keep())
+ .requiredIntegerParameter(Labels.withId(TARGET_EPSG_KEY), 32632)
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
-
- this.geometryMapper =
parameters.extractor().mappingPropertyValue(GEOM_KEY);
- this.sourceEpsgMapper =
parameters.extractor().mappingPropertyValue(SOURCE_EPSG_KEY);
- this.targetEpsg =
parameters.extractor().singleValueParameter(TARGET_EPSG_KEY, Integer.class);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.geometryMapper = params.extractor()
+ .mappingPropertyValue(GEOM_KEY);
+ this.sourceEpsgMapper = params.extractor()
+ .mappingPropertyValue(SOURCE_EPSG_KEY);
+ this.targetEpsg = params.extractor()
+ .singleValueParameter(TARGET_EPSG_KEY,
Integer.class);
// check if SIS DB is set up with imported data or is null
try {
- if (SpReprojectionBuilder.isSisConfigurationValid()){
+ if (SpReprojectionBuilder.isSisConfigurationValid()) {
LOG.info("SIS DB Settings successful checked ");
} else {
LOG.warn("The required EPSG database is not imported");
@@ -102,7 +113,7 @@ public class ReprojectionProcessor extends
StreamPipesDataProcessor {
if (!SpReprojectionBuilder.isSisDbCorrectVersion()) {
LOG.warn("Not supported EPSG DB is used.");
throw new SpRuntimeException("Your current EPSG DB version " +
SpReprojectionBuilder.getSisDbVersion()
- + " is not the supported 9.9.1 version. ");
+ + " is not the supported 9.9.1
version. ");
}
} catch (FactoryException e) {
throw new SpRuntimeException("Something unexpected happened " + e);
@@ -111,14 +122,18 @@ public class ReprojectionProcessor extends
StreamPipesDataProcessor {
// checks if Input EPSG in valid and exists in EPSG DB
if (!SpReprojectionBuilder.isSisEpsgValid(this.targetEpsg)) {
throw new SpRuntimeException("Your chosen EPSG Code " + this.targetEpsg
+ " is not valid. "
- + "Check EPSG on https://spatialreference.org");
+ + "Check EPSG on
https://spatialreference.org");
}
}
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- String geom =
event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
- Integer sourceEpsg =
event.getFieldBySelector(sourceEpsgMapper).getAsPrimitive().getAsInt();
+ String geom = event.getFieldBySelector(geometryMapper)
+ .getAsPrimitive()
+ .getAsString();
+ Integer sourceEpsg = event.getFieldBySelector(sourceEpsgMapper)
+ .getAsPrimitive()
+ .getAsInt();
Geometry geometry = SpGeometryBuilder.createSPGeom(geom, sourceEpsg);
@@ -136,12 +151,11 @@ public class ReprojectionProcessor extends
StreamPipesDataProcessor {
collector.collect(event);
} else {
LOG.warn("An empty point geometry is created"
- + " due invalid input values. Check used epsg Code:" + targetEpsg);
+ + " due invalid input values. Check used epsg Code:" +
targetEpsg);
}
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
index cdc18fcdab..a4112c664f 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
@@ -19,32 +19,33 @@
package org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
import
org.apache.streampipes.processors.geo.jvm.jts.helper.SpTrajectoryBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.locationtech.jts.geom.LineString;
import org.locationtech.jts.geom.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
+public class TrajectoryFromPointsProcessor implements
IStreamPipesDataProcessor {
private static final String POINT_KEY = "point-key";
private static final String EPSG_KEY = "epsg-key";
private static final String M_KEY = "m-key";
@@ -63,78 +64,102 @@ public class TrajectoryFromPointsProcessor extends
StreamPipesDataProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(TrajectoryFromPointsProcessor.class);
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory", 0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(
- StreamRequirementsBuilder
- .create()
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq
- ("http://www.opengis.net/ont/geosparql#Geometry"),
- Labels.withId(POINT_KEY),
PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(
- EpRequirements.semanticTypeReq
- ("http://data.ign.fr/def/ignf#CartesianCS"),
- Labels.withId(EPSG_KEY),
PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(
- EpRequirements.numberReq(),
- Labels.withId(M_KEY), PropertyScope.MEASUREMENT_PROPERTY)
- .build()
- )
- .requiredTextParameter(
- Labels.withId(DESCRIPTION_KEY))
- .requiredIntegerParameter(
- Labels.withId(SUBPOINTS_KEY),
- 2, 30, 1)
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ TrajectoryFromPointsProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory", 0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(
+ StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.semanticTypeReq
+
("http://www.opengis.net/ont/geosparql#Geometry"),
+ Labels.withId(POINT_KEY),
PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.semanticTypeReq
+
("http://data.ign.fr/def/ignf#CartesianCS"),
+ Labels.withId(EPSG_KEY),
PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.withId(M_KEY),
PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .build()
+ )
+ .requiredTextParameter(
+ Labels.withId(DESCRIPTION_KEY))
+ .requiredIntegerParameter(
+ Labels.withId(SUBPOINTS_KEY),
+ 2, 30, 1
+ )
- .outputStrategy(OutputStrategies.append(
- EpProperties.stringEp(
- Labels.withId(DESCRIPTION_KEY),
- DESCRIPTION_RUNTIME,
- SO.TEXT),
- EpProperties.stringEp(
- Labels.withId(TRAJECTORY_KEY),
- TRAJECTORY_GEOMETRY_RUNTIME,
- "http://www.opengis.net/ont/geosparql#Geometry"),
- EpProperties.integerEp(
- Labels.withId(EPSG_KEY),
- TRAJECTORY_EPSG_RUNTIME,
- "http://data.ign.fr/def/ignf#CartesianCS")
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(
+ Labels.withId(DESCRIPTION_KEY),
+ DESCRIPTION_RUNTIME,
+ SO.TEXT
+ ),
+ EpProperties.stringEp(
+ Labels.withId(TRAJECTORY_KEY),
+ TRAJECTORY_GEOMETRY_RUNTIME,
+
"http://www.opengis.net/ont/geosparql#Geometry"
+ ),
+ EpProperties.integerEp(
+ Labels.withId(EPSG_KEY),
+ TRAJECTORY_EPSG_RUNTIME,
+ "http://data.ign.fr/def/ignf#CartesianCS"
+ )
+ )
)
- )
- .build();
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- this.pointMapper = parameters.extractor().mappingPropertyValue(POINT_KEY);
- this.mValueMapper = parameters.extractor().mappingPropertyValue(M_KEY);
- this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
- this.description =
parameters.extractor().singleValueParameter(DESCRIPTION_KEY, String.class);
- this.subpoints =
parameters.extractor().singleValueParameter(SUBPOINTS_KEY, Integer.class);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.pointMapper = params.extractor()
+ .mappingPropertyValue(POINT_KEY);
+ this.mValueMapper = params.extractor()
+ .mappingPropertyValue(M_KEY);
+ this.epsgMapper = params.extractor()
+ .mappingPropertyValue(EPSG_KEY);
+ this.description = params.extractor()
+ .singleValueParameter(DESCRIPTION_KEY,
String.class);
+ this.subpoints = params.extractor()
+ .singleValueParameter(SUBPOINTS_KEY, Integer.class);
trajectory = new SpTrajectoryBuilder(subpoints, description);
}
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- // extract values
- String point =
event.getFieldBySelector(pointMapper).getAsPrimitive().getAsString();
- Integer epsg =
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
- Double m =
event.getFieldBySelector(mValueMapper).getAsPrimitive().getAsDouble();
- //create JTS geometry
+ String point = event.getFieldBySelector(pointMapper)
+ .getAsPrimitive()
+ .getAsString();
+ Integer epsg = event.getFieldBySelector(epsgMapper)
+ .getAsPrimitive()
+ .getAsInt();
+ Double m = event.getFieldBySelector(mValueMapper)
+ .getAsPrimitive()
+ .getAsDouble();
+
Point eventGeom = (Point) SpGeometryBuilder.createSPGeom(point, epsg);
LOG.debug("Geometry Point created");
- //adds point and m value to trajectory object
+
trajectory.addPointToTrajectory(eventGeom, m);
LOG.debug("Point added to trajectory");
- // returns JTS LineString
+
LineString geom = trajectory.returnAsLineString(eventGeom.getFactory());
- // adds to stream
+
event.addField(DESCRIPTION_RUNTIME, trajectory.getDescription());
event.addField(TRAJECTORY_GEOMETRY_RUNTIME, geom.toString());
event.addField(TRAJECTORY_EPSG_RUNTIME, epsg);
@@ -142,7 +167,6 @@ public class TrajectoryFromPointsProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java
index 74544e68df..dbd1992e05 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/complex/TopologyValidationProcessor.java
@@ -20,11 +20,13 @@ package
org.apache.streampipes.processors.geo.jvm.jts.processor.validation.compl
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.monitoring.SpLogEntry;
import org.apache.streampipes.model.monitoring.SpLogMessage;
import org.apache.streampipes.model.runtime.Event;
@@ -34,20 +36,20 @@ import
org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
import
org.apache.streampipes.processors.geo.jvm.jts.processor.validation.ValidationOutput;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.operation.valid.IsValidOp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TopologyValidationProcessor extends StreamPipesDataProcessor {
+public class TopologyValidationProcessor implements IStreamPipesDataProcessor {
public static final String GEOM_KEY = "geom-key";
public static final String EPSG_KEY = "epsg-key";
public static final String VALIDATION_OUTPUT_KEY = "validation-output-key";
@@ -60,46 +62,57 @@ public class TopologyValidationProcessor extends
StreamPipesDataProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(TopologyValidationProcessor.class);
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.validation.complex",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry"),
- Labels.withId(GEOM_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
- Labels.withId(EPSG_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .outputStrategy(OutputStrategies.keep())
- .requiredSingleValueSelection(
- Labels.withId(VALIDATION_OUTPUT_KEY),
- Options.from(
- ValidationOutput.VALID.name(),
- ValidationOutput.INVALID.name()
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ TopologyValidationProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.validation.complex",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry"),
+ Labels.withId(GEOM_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
+ Labels.withId(EPSG_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .build())
+ .outputStrategy(OutputStrategies.keep())
+ .requiredSingleValueSelection(
+ Labels.withId(VALIDATION_OUTPUT_KEY),
+ Options.from(
+ ValidationOutput.VALID.name(),
+ ValidationOutput.INVALID.name()
+ )
)
- )
- .requiredSlideToggle(
- Labels.withId(LOG_OUTPUT_KEY),
- false)
- .build();
+ .requiredSlideToggle(
+ Labels.withId(LOG_OUTPUT_KEY),
+ false
+ )
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
-
- this.params = parameters;
- this.geometryMapper =
parameters.extractor().mappingPropertyValue(GEOM_KEY);
- this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
- this.isLogOutput = parameters.extractor().slideToggleValue(LOG_OUTPUT_KEY);
- String readValidationOutput =
parameters.extractor().selectedSingleValue(VALIDATION_OUTPUT_KEY, String.class);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.geometryMapper = params.extractor()
+ .mappingPropertyValue(GEOM_KEY);
+ this.epsgMapper = params.extractor()
+ .mappingPropertyValue(EPSG_KEY);
+ this.isLogOutput = params.extractor()
+ .slideToggleValue(LOG_OUTPUT_KEY);
+ String readValidationOutput = params.extractor()
+
.selectedSingleValue(VALIDATION_OUTPUT_KEY, String.class);
if (readValidationOutput.equals(ValidationOutput.VALID.name())) {
this.outputChoice = ValidationOutput.VALID.name();
@@ -110,19 +123,29 @@ public class TopologyValidationProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- String geom =
event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
- Integer epsg =
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+ String geom = event.getFieldBySelector(geometryMapper)
+ .getAsPrimitive()
+ .getAsString();
+ Integer epsg = event.getFieldBySelector(epsgMapper)
+ .getAsPrimitive()
+ .getAsInt();
Geometry geometry = SpGeometryBuilder.createSPGeom(geom, epsg);
IsValidOp validator = new IsValidOp(geometry);
validator.setSelfTouchingRingFormingHoleValid(true);
boolean itIsValid = validator.isValid();
- if (!itIsValid){
+ if (!itIsValid) {
if (isLogOutput) {
-
SpMonitoringManager.INSTANCE.addErrorMessage(params.getGraph().getElementId(),
- SpLogEntry.from(System.currentTimeMillis(),
+ SpMonitoringManager.INSTANCE.addErrorMessage(
+ params.getGraph()
+ .getElementId(),
+ SpLogEntry.from(
+ System.currentTimeMillis(),
SpLogMessage.from(new SpJtsGeoemtryException(
- validator.getValidationError().toString()))));
+ validator.getValidationError()
+ .toString()))
+ )
+ );
}
}
@@ -138,7 +161,6 @@ public class TopologyValidationProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/simple/GeometryValidationProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/simple/GeometryValidationProcessor.java
index 48362ef36f..36526f6c72 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/simple/GeometryValidationProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/validation/simple/GeometryValidationProcessor.java
@@ -19,11 +19,13 @@
package
org.apache.streampipes.processors.geo.jvm.jts.processor.validation.simple;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
@@ -31,13 +33,12 @@ import
org.apache.streampipes.processors.geo.jvm.jts.processor.validation.Valida
import
org.apache.streampipes.processors.geo.jvm.jts.processor.validation.ValidationType;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.locationtech.jts.geom.Geometry;
import org.slf4j.Logger;
@@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
-public class GeometryValidationProcessor extends StreamPipesDataProcessor {
+public class GeometryValidationProcessor implements IStreamPipesDataProcessor {
public static final String GEOM_KEY = "geom-key";
public static final String EPSG_KEY = "epsg-key";
public static final String VALIDATION_OUTPUT_KEY = "validation-output-key";
@@ -60,50 +61,61 @@ public class GeometryValidationProcessor extends
StreamPipesDataProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(GeometryValidationProcessor.class);
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.validation.simple",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry"),
- Labels.withId(GEOM_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(
-
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
- Labels.withId(EPSG_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .outputStrategy(OutputStrategies.keep())
- .requiredSingleValueSelection(
- Labels.withId(VALIDATION_OUTPUT_KEY),
- Options.from(
- ValidationOutput.VALID.name(),
- ValidationOutput.INVALID.name()
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ GeometryValidationProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.jts.processor.validation.simple",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://www.opengis.net/ont/geosparql#Geometry"),
+ Labels.withId(GEOM_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq("http://data.ign.fr/def/ignf#CartesianCS"),
+ Labels.withId(EPSG_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .build())
+ .outputStrategy(OutputStrategies.keep())
+ .requiredSingleValueSelection(
+ Labels.withId(VALIDATION_OUTPUT_KEY),
+ Options.from(
+ ValidationOutput.VALID.name(),
+ ValidationOutput.INVALID.name()
+ )
)
- )
- .requiredMultiValueSelection(
- Labels.withId(VALIDATION_TYPE_KEY),
- Options.from(
- ValidationType.IsEmpty.name(),
- ValidationType.IsSimple.name()
+ .requiredMultiValueSelection(
+ Labels.withId(VALIDATION_TYPE_KEY),
+ Options.from(
+ ValidationType.IsEmpty.name(),
+ ValidationType.IsSimple.name()
+ )
)
- )
- .build();
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
-
- this.geometryMapper =
parameters.extractor().mappingPropertyValue(GEOM_KEY);
- this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
-
- String readValidationOutput =
parameters.extractor().selectedSingleValue(VALIDATION_OUTPUT_KEY, String.class);
- List<String> readValidationType =
parameters.extractor().selectedMultiValues(VALIDATION_TYPE_KEY, String.class);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.geometryMapper = params.extractor()
+ .mappingPropertyValue(GEOM_KEY);
+ this.epsgMapper = params.extractor()
+ .mappingPropertyValue(EPSG_KEY);
+
+ String readValidationOutput = params.extractor()
+
.selectedSingleValue(VALIDATION_OUTPUT_KEY, String.class);
+ List<String> readValidationType = params.extractor()
+
.selectedMultiValues(VALIDATION_TYPE_KEY, String.class);
if (readValidationOutput.equals(ValidationOutput.VALID.name())) {
this.outputChoice = ValidationOutput.VALID.name();
@@ -116,10 +128,12 @@ public class GeometryValidationProcessor extends
StreamPipesDataProcessor {
this.isEmptySelected = true;
this.isSimpleSelected = true;
this.isMultiSelected = true;
- } else if
(readValidationType.get(0).equals(ValidationType.IsEmpty.name())) {
+ } else if (readValidationType.get(0)
+ .equals(ValidationType.IsEmpty.name())) {
this.isEmptySelected = true;
this.isSimpleSelected = false;
- } else if
(readValidationType.get(0).equals(ValidationType.IsSimple.name())) {
+ } else if (readValidationType.get(0)
+ .equals(ValidationType.IsSimple.name())) {
this.isEmptySelected = false;
this.isSimpleSelected = true;
}
@@ -127,8 +141,12 @@ public class GeometryValidationProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- String geom =
event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
- Integer sourceEpsg =
event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+ String geom = event.getFieldBySelector(geometryMapper)
+ .getAsPrimitive()
+ .getAsString();
+ Integer sourceEpsg = event.getFieldBySelector(epsgMapper)
+ .getAsPrimitive()
+ .getAsInt();
Geometry geometry = SpGeometryBuilder.createSPGeom(geom, sourceEpsg);
@@ -154,7 +172,6 @@ public class GeometryValidationProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
index a80a143be0..b1aa67add9 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
@@ -19,17 +19,20 @@
package
org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversine;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
@@ -37,12 +40,10 @@ import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.vocabulary.Geo;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.net.URI;
-public class HaversineDistanceCalculatorProcessor extends
StreamPipesDataProcessor {
+public class HaversineDistanceCalculatorProcessor implements
IStreamPipesDataProcessor {
private static final String LAT_1_KEY = "lat1";
private static final String LONG_1_KEY = "long1";
private static final String LAT_2_KEY = "lat2";
@@ -54,51 +55,79 @@ public class HaversineDistanceCalculatorProcessor extends
StreamPipesDataProcess
String long2FieldMapper;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create(
-
"org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversine",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LAT),
- Labels.withId(LAT_1_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LNG),
- Labels.withId(LONG_1_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LAT),
- Labels.withId(LAT_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LNG),
- Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
- .build()
- )
- .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
- .create(Datatypes.Float, DISTANCE_RUNTIME_NAME)
- .semanticType(SO.NUMBER)
-
.measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer"))
- .build())
- )
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ HaversineDistanceCalculatorProcessor::new,
+ ProcessingElementBuilder.create(
+
"org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversine",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+
.requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq(Geo.LAT),
+
Labels.withId(LAT_1_KEY), PropertyScope.MEASUREMENT_PROPERTY
+ )
+
.requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq(Geo.LNG),
+
Labels.withId(LONG_1_KEY), PropertyScope.MEASUREMENT_PROPERTY
+ )
+
.requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq(Geo.LAT),
+
Labels.withId(LAT_2_KEY), PropertyScope.MEASUREMENT_PROPERTY
+ )
+
.requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq(Geo.LNG),
+
Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .build()
+ )
+
.outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
+
.create(
+
Datatypes.Float,
+
DISTANCE_RUNTIME_NAME
+ )
+
.semanticType(SO.NUMBER)
+
.measurementUnit(URI.create(
+
"http://qudt.org/vocab/unit#Kilometer"))
+
.build())
+ )
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
-
- this.lat1FieldMapper =
parameters.extractor().mappingPropertyValue(LAT_1_KEY);
- this.long1FieldMapper =
parameters.extractor().mappingPropertyValue(LONG_1_KEY);
- this.lat2FieldMapper =
parameters.extractor().mappingPropertyValue(LAT_2_KEY);
- this.long2FieldMapper =
parameters.extractor().mappingPropertyValue(LONG_2_KEY);
-
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.lat1FieldMapper = params.extractor()
+ .mappingPropertyValue(LAT_1_KEY);
+ this.long1FieldMapper = params.extractor()
+ .mappingPropertyValue(LONG_1_KEY);
+ this.lat2FieldMapper = params.extractor()
+ .mappingPropertyValue(LAT_2_KEY);
+ this.long2FieldMapper = params.extractor()
+ .mappingPropertyValue(LONG_2_KEY);
}
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- float lat1 =
event.getFieldBySelector(lat1FieldMapper).getAsPrimitive().getAsFloat();
- float long1 =
event.getFieldBySelector(long1FieldMapper).getAsPrimitive().getAsFloat();
- float lat2 =
event.getFieldBySelector(lat2FieldMapper).getAsPrimitive().getAsFloat();
- float long2 =
event.getFieldBySelector(long2FieldMapper).getAsPrimitive().getAsFloat();
+ float lat1 = event.getFieldBySelector(lat1FieldMapper)
+ .getAsPrimitive()
+ .getAsFloat();
+ float long1 = event.getFieldBySelector(long1FieldMapper)
+ .getAsPrimitive()
+ .getAsFloat();
+ float lat2 = event.getFieldBySelector(lat2FieldMapper)
+ .getAsPrimitive()
+ .getAsFloat();
+ float long2 = event.getFieldBySelector(long2FieldMapper)
+ .getAsPrimitive()
+ .getAsFloat();
double resultDist = HaversineDistanceUtil.dist(lat1, long1, lat2, long2);
@@ -108,7 +137,6 @@ public class HaversineDistanceCalculatorProcessor extends
StreamPipesDataProcess
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversinestatic/HaversineStaticDistanceCalculatorProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversinestatic/HaversineStaticDistanceCalculatorProcessor.java
index 187dce6633..a26cf0e6cb 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversinestatic/HaversineStaticDistanceCalculatorProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversinestatic/HaversineStaticDistanceCalculatorProcessor.java
@@ -19,17 +19,20 @@
package
org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversinestatic;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
@@ -37,12 +40,10 @@ import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.vocabulary.Geo;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.net.URI;
-public class HaversineStaticDistanceCalculatorProcessor extends
StreamPipesDataProcessor {
+public class HaversineStaticDistanceCalculatorProcessor implements
IStreamPipesDataProcessor {
private static final String LATITUDE_KEY = "latitude-key";
private static final String LONGITUDE_KEY = "longitude-key";
@@ -56,59 +57,77 @@ public class HaversineStaticDistanceCalculatorProcessor
extends StreamPipesDataP
Float selectedLongitude;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create(
-
"org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversinestatic",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.semanticTypeReq(Geo.LAT),
- Labels.withId(LATITUDE_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(
- EpRequirements.semanticTypeReq(Geo.LNG),
- Labels.withId(LONGITUDE_KEY),
- PropertyScope.MEASUREMENT_PROPERTY)
- .build()
- )
- .requiredFloatParameter(Labels.withId(SELECTED_LATITUDE_KEY))
- .requiredFloatParameter(Labels.withId(SELECTED_LONGITUDE_KEY))
- .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
- .create(Datatypes.Float, DISTANCE_RUNTIME_NAME)
- .semanticType(SO.NUMBER)
-
.measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer"))
- .build())
- )
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ HaversineStaticDistanceCalculatorProcessor::new,
+ ProcessingElementBuilder.create(
+
"org.apache.streampipes.processors.geo.jvm.latlong.processor.distancecalculator.haversinestatic",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+
.requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq(Geo.LAT),
+
Labels.withId(LATITUDE_KEY),
+
PropertyScope.MEASUREMENT_PROPERTY
+ )
+
.requiredPropertyWithUnaryMapping(
+
EpRequirements.semanticTypeReq(Geo.LNG),
+
Labels.withId(LONGITUDE_KEY),
+
PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .build()
+ )
+
.requiredFloatParameter(Labels.withId(SELECTED_LATITUDE_KEY))
+
.requiredFloatParameter(Labels.withId(SELECTED_LONGITUDE_KEY))
+
.outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
+
.create(
+
Datatypes.Float,
+
DISTANCE_RUNTIME_NAME
+ )
+
.semanticType(SO.NUMBER)
+
.measurementUnit(URI.create(
+
"http://qudt.org/vocab/unit#Kilometer"))
+
.build())
+ )
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- this.latitudeFieldMapper =
parameters.extractor().mappingPropertyValue(LATITUDE_KEY);
- this.longitudeFieldMapper =
parameters.extractor().mappingPropertyValue(LONGITUDE_KEY);
- this.selectedLatitude =
parameters.extractor().singleValueParameter(SELECTED_LATITUDE_KEY, Float.class);
- this.selectedLongitude =
parameters.extractor().singleValueParameter(SELECTED_LONGITUDE_KEY,
Float.class);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.latitudeFieldMapper = params.extractor()
+ .mappingPropertyValue(LATITUDE_KEY);
+ this.longitudeFieldMapper = params.extractor()
+ .mappingPropertyValue(LONGITUDE_KEY);
+ this.selectedLatitude = params.extractor()
+ .singleValueParameter(SELECTED_LATITUDE_KEY,
Float.class);
+ this.selectedLongitude = params.extractor()
+
.singleValueParameter(SELECTED_LONGITUDE_KEY, Float.class);
}
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- Float latitude =
event.getFieldBySelector(latitudeFieldMapper).getAsPrimitive().getAsFloat();
- Float longitude =
event.getFieldBySelector(longitudeFieldMapper).getAsPrimitive().getAsFloat();
+ Float latitude = event.getFieldBySelector(latitudeFieldMapper)
+ .getAsPrimitive()
+ .getAsFloat();
+ Float longitude = event.getFieldBySelector(longitudeFieldMapper)
+ .getAsPrimitive()
+ .getAsFloat();
Float distance = HaversineDistanceUtil.dist(latitude, longitude,
selectedLatitude, selectedLongitude);
event.addField(DISTANCE_RUNTIME_NAME, distance);
collector.collect(event);
-
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemaps/GoogleMapsGeocoderProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemaps/GoogleMapsGeocoderProcessor.java
index b42df348fa..37ebf68d1e 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemaps/GoogleMapsGeocoderProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemaps/GoogleMapsGeocoderProcessor.java
@@ -19,24 +19,25 @@
package
org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemaps;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.geo.jvm.config.ConfigKeys;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.Geo;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import com.google.maps.GeoApiContext;
import com.google.maps.GeocodingApi;
@@ -45,7 +46,7 @@ import com.google.maps.model.GeocodingResult;
import java.io.IOException;
-public class GoogleMapsGeocoderProcessor extends StreamPipesDataProcessor {
+public class GoogleMapsGeocoderProcessor implements IStreamPipesDataProcessor {
private static final String GEOCODER_REQUEST_KEY = "geocoder-request-key";
private static final String LAT_RUNTIME_NAME = "geocoder-latitude";
private static final String LONG_RUNTIME_NAME = "geocoder-longitude";
@@ -53,33 +54,43 @@ public class GoogleMapsGeocoderProcessor extends
StreamPipesDataProcessor {
private GeoApiContext apiContext;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemaps",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(),
- Labels.withId(GEOCODER_REQUEST_KEY),
- PropertyScope.NONE)
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ GoogleMapsGeocoderProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemaps",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.stringReq(),
+ Labels.withId(GEOCODER_REQUEST_KEY),
+ PropertyScope.NONE
+ )
+ .build()
+ )
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.doubleEp(Labels.empty(),
LAT_RUNTIME_NAME, Geo.LAT),
+ EpProperties.stringEp(Labels.empty(),
LONG_RUNTIME_NAME, Geo.LNG)
+ )
+ )
.build()
- )
- .outputStrategy(OutputStrategies.append(
- EpProperties.doubleEp(Labels.empty(), LAT_RUNTIME_NAME, Geo.LAT),
- EpProperties.stringEp(Labels.empty(), LONG_RUNTIME_NAME, Geo.LNG))
- )
- .build();
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.geocoderRequest = params.extractor()
+ .mappingPropertyValue(GEOCODER_REQUEST_KEY);
- this.geocoderRequest =
parameters.extractor().mappingPropertyValue(GEOCODER_REQUEST_KEY);
-
- String googleMapsApiKey =
runtimeContext.getConfigStore().getString(ConfigKeys.GOOGLE_API_KEY);
+ String googleMapsApiKey = runtimeContext.getConfigStore()
+
.getString(ConfigKeys.GOOGLE_API_KEY);
if (googleMapsApiKey == null || googleMapsApiKey.equals("")) {
throw new SpRuntimeException("Could not start Geocoder. Did you forget
to add a Google Maps API key?");
@@ -88,18 +99,17 @@ public class GoogleMapsGeocoderProcessor extends
StreamPipesDataProcessor {
.apiKey(googleMapsApiKey)
.build();
}
- // TODO: RequestDeniedException
- // You must enable Billing on the Google Cloud Project at
- // https://console.cloud.google.com/project/_/billing/enable
- // Learn more at https://developers.google.com/maps/gmp
}
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- String request =
event.getFieldBySelector(geocoderRequest).getAsPrimitive().getAsString();
+ String request = event.getFieldBySelector(geocoderRequest)
+ .getAsPrimitive()
+ .getAsString();
try {
- GeocodingResult[] results = GeocodingApi.geocode(apiContext,
request).await();
+ GeocodingResult[] results = GeocodingApi.geocode(apiContext, request)
+ .await();
Double latitude = results[0].geometry.location.lat;
Double longitude = results[0].geometry.location.lng;
@@ -116,7 +126,6 @@ public class GoogleMapsGeocoderProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemapsstatic/GoogleMapsStaticGeocoderProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemapsstatic/GoogleMapsStaticGeocoderProcessor.java
index 83d2de7ae1..13a4a9280e 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemapsstatic/GoogleMapsStaticGeocoderProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/geocoder/googlemapsstatic/GoogleMapsStaticGeocoderProcessor.java
@@ -19,23 +19,24 @@
package
org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemapsstatic;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.processors.geo.jvm.config.ConfigKeys;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.Geo;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import com.google.maps.GeoApiContext;
import com.google.maps.GeocodingApi;
@@ -44,7 +45,7 @@ import com.google.maps.model.GeocodingResult;
import java.io.IOException;
-public class GoogleMapsStaticGeocoderProcessor extends
StreamPipesDataProcessor {
+public class GoogleMapsStaticGeocoderProcessor implements
IStreamPipesDataProcessor {
private static final String STATIC_GEOCODER_REQUEST_KEY = "sg-request-key";
private static final String LAT_RUNTIME_NAME = "staticgeocoder-latitude";
private static final String LONG_RUNTIME_NAME = "staticgeocoder-longitude";
@@ -54,49 +55,57 @@ public class GoogleMapsStaticGeocoderProcessor extends
StreamPipesDataProcessor
private Double responseLongitude;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemapsstatic",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(
- StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build()
- )
- .requiredTextParameter(Labels.withId(STATIC_GEOCODER_REQUEST_KEY))
- .outputStrategy(OutputStrategies.append(
- EpProperties.doubleEp(Labels.empty(), LAT_RUNTIME_NAME, Geo.LAT),
- EpProperties.stringEp(Labels.empty(), LONG_RUNTIME_NAME, Geo.LNG))
- )
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ GoogleMapsStaticGeocoderProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.geocoder.googlemapsstatic",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(
+ StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build()
+ )
+ .requiredTextParameter(Labels.withId(STATIC_GEOCODER_REQUEST_KEY))
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.doubleEp(Labels.empty(),
LAT_RUNTIME_NAME, Geo.LAT),
+ EpProperties.stringEp(Labels.empty(),
LONG_RUNTIME_NAME, Geo.LNG)
+ )
+ )
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
-
- this.staticGeocoderRequest =
-
parameters.extractor().singleValueParameter(STATIC_GEOCODER_REQUEST_KEY,
String.class);
- String googleMapsApiKey =
runtimeContext.getConfigStore().getString(ConfigKeys.GOOGLE_API_KEY);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.staticGeocoderRequest = params.extractor()
+
.singleValueParameter(STATIC_GEOCODER_REQUEST_KEY, String.class);
+ String googleMapsApiKey = runtimeContext.getConfigStore()
+
.getString(ConfigKeys.GOOGLE_API_KEY);
if (googleMapsApiKey == null || googleMapsApiKey.equals("")) {
- throw new SpRuntimeException("Could not start Geocoder. Did you forget
to add a Google Maps" + " API key?");
+ throw new SpRuntimeException("Could not start Geocoder. Did you forget
to add a Google Maps API key?");
}
- this.apiContext = new
GeoApiContext.Builder().apiKey(googleMapsApiKey).build();
+ this.apiContext = new GeoApiContext.Builder().apiKey(googleMapsApiKey)
+ .build();
try {
- GeocodingResult[] results = GeocodingApi.geocode(apiContext,
staticGeocoderRequest).await();
+ GeocodingResult[] results = GeocodingApi.geocode(apiContext,
staticGeocoderRequest)
+ .await();
this.responseLatitude = results[0].geometry.location.lat;
this.responseLongitude = results[0].geometry.location.lng;
} catch (ApiException | IOException | InterruptedException e) {
e.printStackTrace();
throw new SpRuntimeException("Could not fetch geocoding result");
}
-
}
@Override
@@ -105,11 +114,9 @@ public class GoogleMapsStaticGeocoderProcessor extends
StreamPipesDataProcessor
event.addField(LONG_RUNTIME_NAME, responseLongitude);
collector.collect(event);
-
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/revgeocoder/geocityname/GeoCityNameRevdecodeProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/revgeocoder/geocityname/GeoCityNameRevdecodeProcessor.java
index 67c40690b7..8e0d491fe2 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/revgeocoder/geocityname/GeoCityNameRevdecodeProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/revgeocoder/geocityname/GeoCityNameRevdecodeProcessor.java
@@ -19,25 +19,26 @@
package
org.apache.streampipes.processors.geo.jvm.latlong.processor.revgeocoder.geocityname;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.geo.jvm.latlong.processor.revgeocoder.geocityname.geocode.GeoName;
import
org.apache.streampipes.processors.geo.jvm.latlong.processor.revgeocoder.geocityname.geocode.ReverseGeoCode;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.Geo;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.apache.http.client.fluent.Request;
@@ -46,7 +47,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.zip.ZipInputStream;
-public class GeoCityNameRevdecodeProcessor extends StreamPipesDataProcessor {
+public class GeoCityNameRevdecodeProcessor implements
IStreamPipesDataProcessor {
private static final String LATITUDE_MAPPING_KEY = "latitude-mapping-key";
private static final String LONGITUDE_MAPPING_KEY = "longitude-mapping-key";
private static final String GEONAME_RUNTIME_NAME = "geoname";
@@ -57,38 +58,49 @@ public class GeoCityNameRevdecodeProcessor extends
StreamPipesDataProcessor {
private ReverseGeoCode reverseGeoCode;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.revgeocoder.geocityname",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LAT),
- Labels.withId(LATITUDE_MAPPING_KEY),
- PropertyScope.NONE)
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LNG),
- Labels.withId(LONGITUDE_MAPPING_KEY),
- PropertyScope.NONE)
- .build())
- .outputStrategy(OutputStrategies.append(
- EpProperties.stringEp(Labels.empty(), GEONAME_RUNTIME_NAME,
"http://schema.org/city")
- ))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ GeoCityNameRevdecodeProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.revgeocoder.geocityname",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.semanticTypeReq(Geo.LAT),
+ Labels.withId(LATITUDE_MAPPING_KEY),
+ PropertyScope.NONE
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.semanticTypeReq(Geo.LNG),
+ Labels.withId(LONGITUDE_MAPPING_KEY),
+ PropertyScope.NONE
+ )
+ .build())
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(Labels.empty(), GEONAME_RUNTIME_NAME,
"http://schema.org/city")
+ ))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- this.latitudeFieldMapper =
parameters.extractor().mappingPropertyValue(LATITUDE_MAPPING_KEY);
- this.longitudeFieldMapper =
parameters.extractor().mappingPropertyValue(LONGITUDE_MAPPING_KEY);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.latitudeFieldMapper = params.extractor()
+
.mappingPropertyValue(LATITUDE_MAPPING_KEY);
+ this.longitudeFieldMapper = params.extractor()
+
.mappingPropertyValue(LONGITUDE_MAPPING_KEY);
try {
InputStream stream = downloadCitiesDataSet();
if (stream != null) {
- ZipInputStream zipInputStream = null;
- zipInputStream = new ZipInputStream(stream);
+ ZipInputStream zipInputStream = new ZipInputStream(stream);
this.reverseGeoCode = new ReverseGeoCode(zipInputStream, false);
}
} catch (IOException e) {
@@ -99,8 +111,12 @@ public class GeoCityNameRevdecodeProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- Double latitude =
event.getFieldBySelector(latitudeFieldMapper).getAsPrimitive().getAsDouble();
- Double longitude =
event.getFieldBySelector(longitudeFieldMapper).getAsPrimitive().getAsDouble();
+ Double latitude = event.getFieldBySelector(latitudeFieldMapper)
+ .getAsPrimitive()
+ .getAsDouble();
+ Double longitude = event.getFieldBySelector(longitudeFieldMapper)
+ .getAsPrimitive()
+ .getAsDouble();
GeoName geoName = this.reverseGeoCode.nearestPlace(latitude, longitude);
@@ -109,12 +125,14 @@ public class GeoCityNameRevdecodeProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
private InputStream downloadCitiesDataSet() throws IOException {
- byte[] citiesDataset =
Request.Get(CITIES_DATASET_URL).execute().returnContent().asBytes();
+ byte[] citiesDataset = Request.Get(CITIES_DATASET_URL)
+ .execute()
+ .returnContent()
+ .asBytes();
return new ByteArrayInputStream(citiesDataset);
}
}
diff --git
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
index a67f3bfd64..3165e66f5f 100644
---
a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
+++
b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
@@ -19,17 +19,20 @@
package
org.apache.streampipes.processors.geo.jvm.latlong.processor.speedcalculator;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
@@ -37,14 +40,12 @@ import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.vocabulary.Geo;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import java.net.URI;
-public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
+public class SpeedCalculatorProcessor implements IStreamPipesDataProcessor {
private static final String TIMESTAMP_KEY = "timestamp-key";
private static final String LATITUDE_KEY = "latitude-key";
private static final String LONGITUDE_KEY = "longitude-key";
@@ -57,39 +58,55 @@ public class SpeedCalculatorProcessor extends
StreamPipesDataProcessor {
private CircularFifoBuffer buffer;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.speedcalculator",
0)
- .category(DataProcessorType.GEO)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_KEY), PropertyScope.HEADER_PROPERTY)
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LAT)
- , Labels.withId(LATITUDE_KEY),
PropertyScope.MEASUREMENT_PROPERTY)
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq(Geo.LNG)
- , Labels.withId(LONGITUDE_KEY),
PropertyScope.MEASUREMENT_PROPERTY)
- .build())
- .requiredIntegerParameter(Labels.withId(COUNT_WINDOW_KEY))
- .outputStrategy(
- OutputStrategies.append(PrimitivePropertyBuilder
- .create(Datatypes.Float, SPEED_RUNTIME_NAME)
- .semanticType(SO.NUMBER)
-
.measurementUnit(URI.create("http://qudt.org/vocab/unit#KilometerPerHour"))
- .build())
- )
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ SpeedCalculatorProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.geo.jvm.latlong.processor.speedcalculator",
0)
+ .category(DataProcessorType.GEO)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_KEY),
PropertyScope.HEADER_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.semanticTypeReq(Geo.LAT),
+ Labels.withId(LATITUDE_KEY),
PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.semanticTypeReq(Geo.LNG),
+ Labels.withId(LONGITUDE_KEY),
PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .build())
+ .requiredIntegerParameter(Labels.withId(COUNT_WINDOW_KEY))
+ .outputStrategy(
+ OutputStrategies.append(PrimitivePropertyBuilder
+ .create(Datatypes.Float,
SPEED_RUNTIME_NAME)
+ .semanticType(SO.NUMBER)
+
.measurementUnit(URI.create("http://qudt.org/vocab/unit#KilometerPerHour"))
+ .build())
+ )
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- this.latitudeFieldMapper =
parameters.extractor().mappingPropertyValue(LATITUDE_KEY);
- this.longitudeFieldMapper =
parameters.extractor().mappingPropertyValue(LONGITUDE_KEY);
- this.timestampFieldMapper =
parameters.extractor().mappingPropertyValue(TIMESTAMP_KEY);
- this.countWindowSize =
parameters.extractor().singleValueParameter(COUNT_WINDOW_KEY, Integer.class);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ this.latitudeFieldMapper = params.extractor()
+ .mappingPropertyValue(LATITUDE_KEY);
+ this.longitudeFieldMapper = params.extractor()
+ .mappingPropertyValue(LONGITUDE_KEY);
+ this.timestampFieldMapper = params.extractor()
+ .mappingPropertyValue(TIMESTAMP_KEY);
+ this.countWindowSize = params.extractor()
+ .singleValueParameter(COUNT_WINDOW_KEY,
Integer.class);
this.buffer = new CircularFifoBuffer(countWindowSize);
}
@@ -105,8 +122,7 @@ public class SpeedCalculatorProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
private double calculateSpeed(Event firstEvent, Event currentEvent) {
@@ -118,8 +134,10 @@ public class SpeedCalculatorProcessor extends
StreamPipesDataProcessor {
Float currentLongitude = getFloat(currentEvent, this.longitudeFieldMapper);
Long currentTimestamp = getLong(currentEvent, this.timestampFieldMapper);
- Float distanceInKm = HaversineDistanceUtil.dist(firstLatitude,
firstLongitude, currentLatitude,
- currentLongitude);
+ Float distanceInKm = HaversineDistanceUtil.dist(
+ firstLatitude, firstLongitude, currentLatitude,
+ currentLongitude
+ );
Double durationInSeconds = Double.valueOf((currentTimestamp -
firstTimestamp) / 1000.0);
@@ -130,10 +148,14 @@ public class SpeedCalculatorProcessor extends
StreamPipesDataProcessor {
}
private Long getLong(Event event, String fieldName) {
- return event.getFieldBySelector(fieldName).getAsPrimitive().getAsLong();
+ return event.getFieldBySelector(fieldName)
+ .getAsPrimitive()
+ .getAsLong();
}
private Float getFloat(Event event, String fieldName) {
- return event.getFieldBySelector(fieldName).getAsPrimitive().getAsFloat();
+ return event.getFieldBySelector(fieldName)
+ .getAsPrimitive()
+ .getAsFloat();
}
}