This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch fix-error-handling-connect in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit c45125abfab94072ee1757a786c2167aa9882cc6 Author: Dominik Riemer <[email protected]> AuthorDate: Tue Jun 17 23:05:26 2025 +0200 fix: Add exception handling for failing preprocessing rules --- .../management/connect/AdapterWorkerManagement.java | 2 +- .../connect/adapter/model/EventCollector.java | 18 ++++++++++++++---- .../management/monitoring/ExtensionsLogger.java | 10 ++++++++++ 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java index 0d9cfd31f2..3b495a3cb8 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java @@ -70,8 +70,8 @@ public class AdapterWorkerManagement { var registeredParsers = newAdapterInstance.declareConfig().getSupportedParsers(); var extractor = AdapterParameterExtractor.from(adapterDescription, registeredParsers); - var eventCollector = EventCollector.from(adapterDescription); var runtimeContext = makeRuntimeContext(adapterDescription.getElementId()); + var eventCollector = EventCollector.from(adapterDescription, runtimeContext); newAdapterInstance.onAdapterStarted(extractor, eventCollector, runtimeContext); } else { diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java index c59868c8b3..13929db693 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java @@ -19,6 +19,7 @@ package org.apache.streampipes.extensions.management.connect.adapter.model; import org.apache.streampipes.extensions.api.connect.IEventCollector; +import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext; import org.apache.streampipes.extensions.management.connect.adapter.AdapterPipelineGenerator; import org.apache.streampipes.extensions.management.connect.adapter.model.pipeline.AdapterPipeline; import org.apache.streampipes.model.connect.adapter.AdapterDescription; @@ -26,19 +27,28 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription; import java.util.Map; public class EventCollector implements IEventCollector { + private final AdapterPipeline adapterPipeline; + private final IAdapterRuntimeContext runtimeContext; - public EventCollector(AdapterPipeline adapterPipeline) { + public EventCollector(AdapterPipeline adapterPipeline, + IAdapterRuntimeContext runtimeContext) { this.adapterPipeline = adapterPipeline; + this.runtimeContext = runtimeContext; } - public static IEventCollector from(AdapterDescription adapterDescription) { + public static IEventCollector from(AdapterDescription adapterDescription, + IAdapterRuntimeContext runtimeContext) { var adapterPipeline = new AdapterPipelineGenerator().generatePipeline(adapterDescription); - return new EventCollector(adapterPipeline); + return new EventCollector(adapterPipeline, runtimeContext); } @Override public void collect(Map<String, Object> event) { - adapterPipeline.process(event); + try { + adapterPipeline.process(event); + } catch (RuntimeException e) { + runtimeContext.getLogger().error(e); + } } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java index bfe9b56be8..9c01a9549c 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java @@ -23,8 +23,14 @@ import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager; import org.apache.streampipes.model.monitoring.SpLogEntry; import org.apache.streampipes.model.monitoring.SpLogMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + public class ExtensionsLogger implements IExtensionsLogger { + private static final Logger LOG = LoggerFactory.getLogger(ExtensionsLogger.class); + private final String resourceId; private final SpMonitoringManager monitoringManager; @@ -42,21 +48,25 @@ public class ExtensionsLogger implements IExtensionsLogger { @Override public void error(Exception e) { + LOG.error("Error while processing the internal pipeline", e); log(SpLogMessage.from(e)); } @Override public void error(String details, Exception e) { + LOG.error("Error while processing the internal pipeline", e); log(SpLogMessage.from(e, details)); } @Override public void info(String title, String details) { + LOG.info("Info message while processing the internal pipeline: {}, {}", title, details); log(SpLogMessage.info(title, details)); } @Override public void warn(String title, String details) { + LOG.warn("Warning while processing the internal pipeline: {}, {}", title, details); log(SpLogMessage.warn(title, details)); }
