This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch SP-1121 in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit ecbc43024fe96f390b8bd8f3fc4f96d7967bb068 Author: Philipp Zehnder <[email protected]> AuthorDate: Wed Jan 18 21:54:27 2023 +0100 [#1121] Add replay once option to FileStreamProtocol --- .../iiot/protocol/stream/FileStreamProtocol.java | 65 ++++++++++++++-------- .../strings.en | 3 + 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java index 4b588f882..ba6e2a5d2 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java @@ -54,6 +54,12 @@ import java.util.concurrent.TimeUnit; public class FileStreamProtocol extends Protocol { + private static final String REPLACE_TIMESTAMP = "replaceTimestamp"; + private static final String SPEED = "speed"; + private static final String FILE_PATH = "filePath"; + private static final String REPLAY_ONCE = "replayOnce"; + + private static final Logger logger = LoggerFactory.getLogger(FileStreamProtocol.class); public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.file"; @@ -61,6 +67,8 @@ public class FileStreamProtocol extends Protocol { private String selectedFileName; private boolean replaceTimestamp; private float speedUp; + + private boolean replayOnce; private int timeBetweenReplay; private ScheduledExecutorService executor; @@ -73,32 +81,45 @@ public class FileStreamProtocol extends Protocol { String selectedFileName, boolean replaceTimestamp, float speedUp, - int timeBetweenReplay) { + int timeBetweenReplay, + boolean replayOnce) { super(parser, format); this.selectedFileName = selectedFileName; this.replaceTimestamp = replaceTimestamp; this.speedUp = speedUp; this.timeBetweenReplay = timeBetweenReplay; + this.replayOnce = replayOnce; } @Override public void run(IAdapterPipeline adapterPipeline) throws AdapterException { String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema()); - executor = Executors.newScheduledThreadPool(1); var eventProcessor = new LocalEventProcessor(adapterPipeline, timestampKey); - executor.scheduleAtFixedRate(() -> { - try (InputStream dataInputStream = getDataFromEndpoint()) { - format.reset(); - parser.parse(dataInputStream, eventProcessor); - } catch (ParseException | IOException e) { - logger.error("Error while parsing: " + e.getMessage()); - } - }, 0, timeBetweenReplay, TimeUnit.SECONDS); + if (replayOnce) { + executor.schedule(() -> processFileInput(eventProcessor), + 0, + TimeUnit.SECONDS); + } else { + executor.scheduleAtFixedRate(() -> processFileInput(eventProcessor), + 0, + timeBetweenReplay, + TimeUnit.SECONDS); + } } + private void processFileInput(LocalEventProcessor eventProcessor) { + try (InputStream dataInputStream = getDataFromEndpoint()) { + format.reset(); + parser.parse(dataInputStream, eventProcessor); + } catch (ParseException | IOException e) { + logger.error("Error while parsing: " + e.getMessage()); + } + } + + private class LocalEventProcessor implements EmitBinaryEvent { private final IAdapterPipeline adapterPipeline; @@ -167,18 +188,17 @@ public class FileStreamProtocol extends Protocol { @Override public Protocol getInstance(ProtocolDescription protocolDescription, IParser parser, IFormat format) { - StaticPropertyExtractor extractor = + var extractor = StaticPropertyExtractor.from(protocolDescription.getConfig(), new ArrayList<>()); - List<String> replaceTimestampStringList = extractor.selectedMultiValues("replaceTimestamp", String.class); - boolean replaceTimestamp = replaceTimestampStringList.size() != 0; - - float speedUp = extractor.singleValueParameter("speed", Float.class); - - int timeBetweenReplay = 1; + var replaceTimestampStringList = extractor.selectedMultiValues(REPLACE_TIMESTAMP, String.class); + var replaceTimestamp = replaceTimestampStringList.size() != 0; + var speedUp = extractor.singleValueParameter(SPEED, Float.class); + var timeBetweenReplay = 1; + var fileName = extractor.selectedFilename(FILE_PATH); + var replayOnce = extractor.selectedSingleValue(REPLAY_ONCE, String.class).equals("yes"); - String fileName = extractor.selectedFilename("filePath"); - return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay); + return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay, replayOnce); } private String getTimestampKey(EventSchema eventSchema) throws AdapterException { @@ -198,10 +218,11 @@ public class FileStreamProtocol extends Protocol { .withLocales(Locales.EN) .sourceType(AdapterSourceType.STREAM) .category(AdapterType.Generic) - .requiredFile(Labels.withId("filePath"), Filetypes.CSV, Filetypes.JSON, Filetypes.XML) - .requiredMultiValueSelection(Labels.withId("replaceTimestamp"), + .requiredFile(Labels.withId(FILE_PATH), Filetypes.CSV, Filetypes.JSON, Filetypes.XML) + .requiredMultiValueSelection(Labels.withId(REPLACE_TIMESTAMP), Options.from("")) - .requiredFloatParameter(Labels.withId("speed")) + .requiredSingleValueSelection(Labels.withId(REPLAY_ONCE), Options.from("no", "yes")) + .requiredFloatParameter(Labels.withId(SPEED)) .build(); } diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en index 597753f3f..15a9eb247 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.file/strings.en @@ -28,3 +28,6 @@ replaceTimestamp.description=Replace Event Time with Current Timestamp speed.title=Replay Speed speed.description=original = 1; speedup 2x = 2; half speed = 0.5 +replayOnce.title=Replay Once +replayOnce.description='yes' file is only replayed once, 'no' the file is replayed till adapter is stopped +
