This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1085
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/SP-1085 by this push:
     new 8fd086049 [#1085] Replace thread with executor service in 
FileStreamProtocol
8fd086049 is described below

commit 8fd0860492d0ede12d4c7684132ec365ef604ce9
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Jan 13 16:22:23 2023 +0100

    [#1085] Replace thread with executor service in FileStreamProtocol
---
 .../iiot/protocol/stream/FileStreamProtocol.java   | 75 +++++++++++-----------
 1 file changed, 38 insertions(+), 37 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index a07813dbb..060a49cab 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -22,6 +22,7 @@ import 
org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
 import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
 import org.apache.streampipes.extensions.api.connect.IFormat;
 import org.apache.streampipes.extensions.api.connect.IParser;
+import 
org.apache.streampipes.extensions.api.connect.exception.AdapterException;
 import org.apache.streampipes.extensions.api.connect.exception.ParseException;
 import org.apache.streampipes.extensions.management.connect.SendToPipeline;
 import 
org.apache.streampipes.extensions.management.connect.adapter.guess.SchemaGuesser;
@@ -51,10 +52,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 public class FileStreamProtocol extends Protocol {
 
-  private static Logger logger = 
LoggerFactory.getLogger(FileStreamProtocol.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(FileStreamProtocol.class);
 
   public static final String ID = 
"org.apache.streampipes.connect.iiot.protocol.stream.file";
 
@@ -63,15 +66,17 @@ public class FileStreamProtocol extends Protocol {
   private float speedUp;
   private int timeBetweenReplay;
 
-  private Thread task;
-  private boolean running;
-
+  private ExecutorService executor;
 
   public FileStreamProtocol() {
   }
 
-  public FileStreamProtocol(IParser parser, IFormat format, String 
selectedFileName,
-                            boolean replaceTimestamp, float speedUp, int 
timeBetweenReplay) {
+  public FileStreamProtocol(IParser parser,
+                            IFormat format,
+                            String selectedFileName,
+                            boolean replaceTimestamp,
+                            float speedUp,
+                            int timeBetweenReplay) {
     super(parser, format);
     this.selectedFileName = selectedFileName;
     this.replaceTimestamp = replaceTimestamp;
@@ -80,7 +85,7 @@ public class FileStreamProtocol extends Protocol {
   }
 
   @Override
-  public void run(IAdapterPipeline adapterPipeline) {
+  public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
     String timestampKey = 
getTimestampKey(adapterPipeline.getResultingEventSchema());
 
     // exchange adapter pipeline sink with special purpose replay sink for 
file replay
@@ -106,40 +111,31 @@ public class FileStreamProtocol extends Protocol {
           speedUp));
     }
 
-    running = true;
-    task = new Thread() {
-      @Override
-      public void run() {
-        while (running) {
-
-          format.reset();
-          SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
-          InputStream dataInputStream = getDataFromEndpoint();
-          try {
-            if (dataInputStream != null) {
-              parser.parse(dataInputStream, stk);
-            } else {
-              logger.warn("Could not read data from file.");
-            }
-          } catch (ParseException e) {
-            logger.error("Error while parsing: " + e.getMessage());
-          }
-
-          try {
-            Thread.sleep(timeBetweenReplay * 1000);
-          } catch (InterruptedException e) {
-            logger.error("Error while waiting for next replay round" + 
e.getMessage());
-          }
-        }
+    executor = Executors.newSingleThreadExecutor();
+
+    executor.execute(() -> {
+      format.reset();
+      SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
+      InputStream dataInputStream = getDataFromEndpoint();
+      try {
+        parser.parse(dataInputStream, stk);
+      } catch (ParseException e) {
+        logger.error("Error while parsing: " + e.getMessage());
+      }
+
+      try {
+        Thread.sleep(timeBetweenReplay * 1000L);
+      } catch (InterruptedException e) {
+        logger.error("Error while waiting for next replay round" + 
e.getMessage());
       }
-    };
-    task.start();
+    });
   }
 
 
   @Override
   public void stop() {
-    running = false;
+    executor.shutdown();
+    logger.info("Stopped file stream adapter for file " + selectedFileName);
   }
 
   private InputStream getDataFromEndpoint() throws ParseException {
@@ -168,9 +164,14 @@ public class FileStreamProtocol extends Protocol {
     return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, 
speedUp, timeBetweenReplay);
   }
 
-  private String getTimestampKey(EventSchema eventSchema) {
+  private String getTimestampKey(EventSchema eventSchema) throws 
AdapterException {
     var timestampProperty = EventSchemaUtils.getTimestampProperty(eventSchema);
-    return timestampProperty.get().getRuntimeName();
+    if (timestampProperty.isPresent()) {
+      return timestampProperty.get().getRuntimeName();
+    } else {
+      throw new AdapterException("No timestamp present in event schema");
+    }
+
   }
 
   @Override

Reply via email to