This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 317f5da15 Implemented unit tests for
SwingingDoorTrendingFilterProcessor (#2230)
317f5da15 is described below
commit 317f5da151cc8fe5748ace24a1ba6a01bd01be1d
Author: Isaak Krut <[email protected]>
AuthorDate: Tue Nov 28 10:57:38 2023 -0500
Implemented unit tests for SwingingDoorTrendingFilterProcessor (#2230)
---
.../sdt/SwingingDoorTrendingFilterProcessor.java | 10 +-
.../TestSwingingDoorTrendingFilterProcessor.java | 182 +++++++++++++++++++++
2 files changed, 187 insertions(+), 5 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/SwingingDoorTrendingFilterProcessor.java
b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/SwingingDoorTrendingFilterProcessor.java
index e13e52d4b..5f2c3322d 100644
---
a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/SwingingDoorTrendingFilterProcessor.java
+++
b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/SwingingDoorTrendingFilterProcessor.java
@@ -37,11 +37,11 @@ import
org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
public class SwingingDoorTrendingFilterProcessor extends
StreamPipesDataProcessor {
- private static final String SDT_TIMESTAMP_FIELD_KEY = "sdt-timestamp-field";
- private static final String SDT_VALUE_FIELD_KEY = "sdt-value-field";
- private static final String SDT_COMPRESSION_DEVIATION_KEY =
"sdt-compression-deviation";
- private static final String SDT_COMPRESSION_MIN_INTERVAL_KEY =
"sdt-compression-min-interval";
- private static final String SDT_COMPRESSION_MAX_INTERVAL_KEY =
"sdt-compression-max-interval";
+ public static final String SDT_TIMESTAMP_FIELD_KEY = "sdt-timestamp-field";
+ public static final String SDT_VALUE_FIELD_KEY = "sdt-value-field";
+ public static final String SDT_COMPRESSION_DEVIATION_KEY =
"sdt-compression-deviation";
+ public static final String SDT_COMPRESSION_MIN_INTERVAL_KEY =
"sdt-compression-min-interval";
+ public static final String SDT_COMPRESSION_MAX_INTERVAL_KEY =
"sdt-compression-max-interval";
private String sdtTimestampField;
private String sdtValueField;
diff --git
a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java
new file mode 100644
index 000000000..db2f93f78
--- /dev/null
+++
b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.filters.jvm.processor.sdt;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.runtime.SchemaInfo;
+import org.apache.streampipes.model.runtime.SourceInfo;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
+import
org.apache.streampipes.processors.filters.jvm.processor.merge.TestMergeByTimeProcessor;
+import org.apache.streampipes.test.extensions.api.StoreEventCollector;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
+import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
+import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestSwingingDoorTrendingFilterProcessor {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestMergeByTimeProcessor.class);
+
+ @org.junit.runners.Parameterized.Parameter
+ public String testName;
+ @org.junit.runners.Parameterized.Parameter(1)
+ public String sdtCompressionDeviation;
+ @org.junit.runners.Parameterized.Parameter(2)
+ public String sdtCompressionMinTimeInterval;
+ @org.junit.runners.Parameterized.Parameter(3)
+ public String sdtCompressionMaxTimeInterval;
+ @org.junit.runners.Parameterized.Parameter(4)
+ public int expectedFilteredCount;
+ @org.junit.runners.Parameterized.Parameter(5)
+ public List<Pair<Long, Double>> eventSettings;
+ @org.junit.runners.Parameterized.Parameter(6)
+ public boolean expectException;
+ @org.junit.runners.Parameterized.Parameter(7)
+ public String expectedErrorMessage;
+
+ private final String sdtTimestampField = "sdtTimestampField";
+ private final String sdtValueField = "sdtValueField";
+
+ @org.junit.runners.Parameterized.Parameters
+ public static Iterable<Object[]> data() {
+
+ return Arrays.asList(new Object[][]{
+ {"testWithOneEvent", "10.0", "100", "500", 1, List.of(Pair.of(9,
50.0)), false, ""},
+ {"testFullFilter", "10.0", "100", "500", 4, List.of(
+ Pair.of(0, 50.0), //true
+ Pair.of(50, 50.0), //false
+ Pair.of(200, 100.0), //false
+ Pair.of(270, 140.0), //false
+ Pair.of(300, 250.0), //true
+ Pair.of(900, 500.0), //true
+ Pair.of(1100, 800.0), //false
+ Pair.of(1250, 1600.0) //true
+ )
+ , false, ""},
+ {"testWithNegativeCompressionDeviation", "-10.0", "100", "500", 1,
new ArrayList<>(), true
+ , "Compression Deviation should be positive!"},
+ {"testWithNegativeMinInterval", "10.0", "-100", "500", 1, new
ArrayList<>(), true
+ , "Compression Minimum Time Interval should be >= 0!"},
+ {"testWithMinInterval>MaxInterval", "10.0", "1000", "500", 1, new
ArrayList<>(), true
+ , "Compression Minimum Time Interval should be <
Compression Maximum Time Interval!"}
+ });
+ }
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @Test
+ public void testSdtFilter() {
+ LOG.info("Executing test: {}", testName);
+ SwingingDoorTrendingFilterProcessor processor = new
SwingingDoorTrendingFilterProcessor();
+ DataProcessorDescription originalGraph = processor.declareModel();
+
originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
+
+ DataProcessorInvocation graph =
+ InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
+
+ ProcessorParams params = new ProcessorParams(graph);
+
params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY
+ , MappingPropertyUnary.class)
+ .setSelectedProperty("test::" + sdtTimestampField);
+
params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY
+ , MappingPropertyUnary.class)
+ .setSelectedProperty("test::" + sdtValueField);
+
params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY
+ , FreeTextStaticProperty.class)
+ .setValue(sdtCompressionDeviation);
+
params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY
+ , FreeTextStaticProperty.class)
+ .setValue(sdtCompressionMinTimeInterval);
+
params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY
+ , FreeTextStaticProperty.class)
+ .setValue(sdtCompressionMaxTimeInterval);
+
+ if (expectException){
+ LOG.info("Expecting Error Message: {}", expectedErrorMessage);
+ exceptionRule.expect(SpRuntimeException.class);
+ exceptionRule.expectMessage(expectedErrorMessage);
+ }
+
+ StoreEventCollector eventCollector = new StoreEventCollector();
+ processor.onInvocation(params, eventCollector, null);
+
+ int result = sendEvents(processor, eventCollector);
+
+ LOG.info("Expected SDT filtered count is: {}", expectedFilteredCount);
+ LOG.info("Actual SDT filtered count is: {}", result);
+ assertEquals(expectedFilteredCount, result);
+ }
+
+
+ private int sendEvents(SwingingDoorTrendingFilterProcessor processor,
StoreEventCollector collector) {
+ List<Event> events = makeEvents();
+ for (Event event : events) {
+ LOG.info("Sending event with timestamp: "
+ + event.getFieldBySelector("test::" +
sdtTimestampField).getAsPrimitive().getAsLong()
+ + ", and value: "
+ + event.getFieldBySelector("test::" +
sdtValueField).getAsPrimitive().getAsFloat());
+ processor.onEvent(event, collector);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return collector.getEvents().size();
+ }
+
+ private List<Event> makeEvents() {
+ List<Event> events = new ArrayList<>();
+ for (Pair<Long, Double> eventSetting: eventSettings) {
+ events.add(makeEvent(eventSetting));
+ }
+ return events;
+ }
+
+ private Event makeEvent(Pair<Long, Double> eventSetting) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(sdtTimestampField, eventSetting.getKey());
+ map.put(sdtValueField, eventSetting.getValue());
+ return EventFactory.fromMap(map, new SourceInfo("test" + "-topic", "test"),
+ new SchemaInfo(null, new ArrayList<>()));
+ }
+}
\ No newline at end of file