This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7f1d18defd fix: Add exception handling for failing preprocessing rules 
(#3681)
7f1d18defd is described below

commit 7f1d18defd450000b6c3f54c0c25ab3dc5220111
Author: Dominik Riemer <[email protected]>
AuthorDate: Wed Jun 18 16:19:02 2025 +0200

    fix: Add exception handling for failing preprocessing rules (#3681)
---
 .../management/connect/AdapterWorkerManagement.java    |  2 +-
 .../connect/adapter/model/EventCollector.java          | 18 ++++++++++++++----
 .../management/monitoring/ExtensionsLogger.java        | 10 ++++++++++
 3 files changed, 25 insertions(+), 5 deletions(-)

diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java
index 0d9cfd31f2..3b495a3cb8 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java
@@ -70,8 +70,8 @@ public class AdapterWorkerManagement {
 
       var registeredParsers = 
newAdapterInstance.declareConfig().getSupportedParsers();
       var extractor = AdapterParameterExtractor.from(adapterDescription, 
registeredParsers);
-      var eventCollector = EventCollector.from(adapterDescription);
       var runtimeContext = 
makeRuntimeContext(adapterDescription.getElementId());
+      var eventCollector = EventCollector.from(adapterDescription, 
runtimeContext);
 
       newAdapterInstance.onAdapterStarted(extractor, eventCollector, 
runtimeContext);
     } else {
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java
index c59868c8b3..13929db693 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.extensions.management.connect.adapter.model;
 
 import org.apache.streampipes.extensions.api.connect.IEventCollector;
+import 
org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
 import 
org.apache.streampipes.extensions.management.connect.adapter.AdapterPipelineGenerator;
 import 
org.apache.streampipes.extensions.management.connect.adapter.model.pipeline.AdapterPipeline;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -26,19 +27,28 @@ import 
org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import java.util.Map;
 
 public class EventCollector implements IEventCollector {
+
   private final AdapterPipeline adapterPipeline;
+  private final IAdapterRuntimeContext runtimeContext;
 
-  public EventCollector(AdapterPipeline adapterPipeline) {
+  public EventCollector(AdapterPipeline adapterPipeline,
+                        IAdapterRuntimeContext runtimeContext) {
     this.adapterPipeline = adapterPipeline;
+    this.runtimeContext = runtimeContext;
   }
 
-  public static IEventCollector from(AdapterDescription adapterDescription) {
+  public static IEventCollector from(AdapterDescription adapterDescription,
+                                     IAdapterRuntimeContext runtimeContext) {
     var adapterPipeline = new 
AdapterPipelineGenerator().generatePipeline(adapterDescription);
-    return new EventCollector(adapterPipeline);
+    return new EventCollector(adapterPipeline, runtimeContext);
   }
 
   @Override
   public void collect(Map<String, Object> event) {
-    adapterPipeline.process(event);
+    try {
+      adapterPipeline.process(event);
+    } catch (RuntimeException e) {
+      runtimeContext.getLogger().error(e);
+    }
   }
 }
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java
index bfe9b56be8..9c01a9549c 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java
@@ -23,8 +23,14 @@ import 
org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
 import org.apache.streampipes.model.monitoring.SpLogEntry;
 import org.apache.streampipes.model.monitoring.SpLogMessage;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
 public class ExtensionsLogger implements IExtensionsLogger {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExtensionsLogger.class);
+
   private final String resourceId;
   private final SpMonitoringManager monitoringManager;
 
@@ -42,21 +48,25 @@ public class ExtensionsLogger implements IExtensionsLogger {
 
   @Override
   public void error(Exception e) {
+    LOG.error("Error while processing the internal pipeline", e);
     log(SpLogMessage.from(e));
   }
 
   @Override
   public void error(String details, Exception e) {
+    LOG.error("Error while processing the internal pipeline", e);
     log(SpLogMessage.from(e, details));
   }
 
   @Override
   public void info(String title, String details) {
+    LOG.info("Info message while processing the internal pipeline: {}, {}", 
title, details);
     log(SpLogMessage.info(title, details));
   }
 
   @Override
   public void warn(String title, String details) {
+    LOG.warn("Warning while processing the internal pipeline: {}, {}", title, 
details);
     log(SpLogMessage.warn(title, details));
   }
 

Reply via email to