This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch SP-1085
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/SP-1085 by this push:
new 8fd086049 [#1085] Replace thread with executor service in
FileStreamProtocol
8fd086049 is described below
commit 8fd0860492d0ede12d4c7684132ec365ef604ce9
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Jan 13 16:22:23 2023 +0100
[#1085] Replace thread with executor service in FileStreamProtocol
---
.../iiot/protocol/stream/FileStreamProtocol.java | 75 +++++++++++-----------
1 file changed, 38 insertions(+), 37 deletions(-)
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index a07813dbb..060a49cab 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -22,6 +22,7 @@ import
org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
import org.apache.streampipes.extensions.api.connect.IFormat;
import org.apache.streampipes.extensions.api.connect.IParser;
+import
org.apache.streampipes.extensions.api.connect.exception.AdapterException;
import org.apache.streampipes.extensions.api.connect.exception.ParseException;
import org.apache.streampipes.extensions.management.connect.SendToPipeline;
import
org.apache.streampipes.extensions.management.connect.adapter.guess.SchemaGuesser;
@@ -51,10 +52,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
public class FileStreamProtocol extends Protocol {
- private static Logger logger =
LoggerFactory.getLogger(FileStreamProtocol.class);
+ private static final Logger logger =
LoggerFactory.getLogger(FileStreamProtocol.class);
public static final String ID =
"org.apache.streampipes.connect.iiot.protocol.stream.file";
@@ -63,15 +66,17 @@ public class FileStreamProtocol extends Protocol {
private float speedUp;
private int timeBetweenReplay;
- private Thread task;
- private boolean running;
-
+ private ExecutorService executor;
public FileStreamProtocol() {
}
- public FileStreamProtocol(IParser parser, IFormat format, String
selectedFileName,
- boolean replaceTimestamp, float speedUp, int
timeBetweenReplay) {
+ public FileStreamProtocol(IParser parser,
+ IFormat format,
+ String selectedFileName,
+ boolean replaceTimestamp,
+ float speedUp,
+ int timeBetweenReplay) {
super(parser, format);
this.selectedFileName = selectedFileName;
this.replaceTimestamp = replaceTimestamp;
@@ -80,7 +85,7 @@ public class FileStreamProtocol extends Protocol {
}
@Override
- public void run(IAdapterPipeline adapterPipeline) {
+ public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
String timestampKey =
getTimestampKey(adapterPipeline.getResultingEventSchema());
// exchange adapter pipeline sink with special purpose replay sink for
file replay
@@ -106,40 +111,31 @@ public class FileStreamProtocol extends Protocol {
speedUp));
}
- running = true;
- task = new Thread() {
- @Override
- public void run() {
- while (running) {
-
- format.reset();
- SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
- InputStream dataInputStream = getDataFromEndpoint();
- try {
- if (dataInputStream != null) {
- parser.parse(dataInputStream, stk);
- } else {
- logger.warn("Could not read data from file.");
- }
- } catch (ParseException e) {
- logger.error("Error while parsing: " + e.getMessage());
- }
-
- try {
- Thread.sleep(timeBetweenReplay * 1000);
- } catch (InterruptedException e) {
- logger.error("Error while waiting for next replay round" +
e.getMessage());
- }
- }
+ executor = Executors.newSingleThreadExecutor();
+
+ executor.execute(() -> {
+ format.reset();
+ SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
+ InputStream dataInputStream = getDataFromEndpoint();
+ try {
+ parser.parse(dataInputStream, stk);
+ } catch (ParseException e) {
+ logger.error("Error while parsing: " + e.getMessage());
+ }
+
+ try {
+ Thread.sleep(timeBetweenReplay * 1000L);
+ } catch (InterruptedException e) {
+ logger.error("Error while waiting for next replay round" +
e.getMessage());
}
- };
- task.start();
+ });
}
@Override
public void stop() {
- running = false;
+ executor.shutdown();
+ logger.info("Stopped file stream adapter for file " + selectedFileName);
}
private InputStream getDataFromEndpoint() throws ParseException {
@@ -168,9 +164,14 @@ public class FileStreamProtocol extends Protocol {
return new FileStreamProtocol(parser, format, fileName, replaceTimestamp,
speedUp, timeBetweenReplay);
}
- private String getTimestampKey(EventSchema eventSchema) {
+ private String getTimestampKey(EventSchema eventSchema) throws
AdapterException {
var timestampProperty = EventSchemaUtils.getTimestampProperty(eventSchema);
- return timestampProperty.get().getRuntimeName();
+ if (timestampProperty.isPresent()) {
+ return timestampProperty.get().getRuntimeName();
+ } else {
+ throw new AdapterException("No timestamp present in event schema");
+ }
+
}
@Override