This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 7f1d18defd fix: Add exception handling for failing preprocessing rules
(#3681)
7f1d18defd is described below
commit 7f1d18defd450000b6c3f54c0c25ab3dc5220111
Author: Dominik Riemer <[email protected]>
AuthorDate: Wed Jun 18 16:19:02 2025 +0200
fix: Add exception handling for failing preprocessing rules (#3681)
---
.../management/connect/AdapterWorkerManagement.java | 2 +-
.../connect/adapter/model/EventCollector.java | 18 ++++++++++++++----
.../management/monitoring/ExtensionsLogger.java | 10 ++++++++++
3 files changed, 25 insertions(+), 5 deletions(-)
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java
index 0d9cfd31f2..3b495a3cb8 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java
@@ -70,8 +70,8 @@ public class AdapterWorkerManagement {
var registeredParsers =
newAdapterInstance.declareConfig().getSupportedParsers();
var extractor = AdapterParameterExtractor.from(adapterDescription,
registeredParsers);
- var eventCollector = EventCollector.from(adapterDescription);
var runtimeContext =
makeRuntimeContext(adapterDescription.getElementId());
+ var eventCollector = EventCollector.from(adapterDescription,
runtimeContext);
newAdapterInstance.onAdapterStarted(extractor, eventCollector,
runtimeContext);
} else {
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java
index c59868c8b3..13929db693 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/EventCollector.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.extensions.management.connect.adapter.model;
import org.apache.streampipes.extensions.api.connect.IEventCollector;
+import
org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import
org.apache.streampipes.extensions.management.connect.adapter.AdapterPipelineGenerator;
import
org.apache.streampipes.extensions.management.connect.adapter.model.pipeline.AdapterPipeline;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -26,19 +27,28 @@ import
org.apache.streampipes.model.connect.adapter.AdapterDescription;
import java.util.Map;
public class EventCollector implements IEventCollector {
+
private final AdapterPipeline adapterPipeline;
+ private final IAdapterRuntimeContext runtimeContext;
- public EventCollector(AdapterPipeline adapterPipeline) {
+ public EventCollector(AdapterPipeline adapterPipeline,
+ IAdapterRuntimeContext runtimeContext) {
this.adapterPipeline = adapterPipeline;
+ this.runtimeContext = runtimeContext;
}
- public static IEventCollector from(AdapterDescription adapterDescription) {
+ public static IEventCollector from(AdapterDescription adapterDescription,
+ IAdapterRuntimeContext runtimeContext) {
var adapterPipeline = new
AdapterPipelineGenerator().generatePipeline(adapterDescription);
- return new EventCollector(adapterPipeline);
+ return new EventCollector(adapterPipeline, runtimeContext);
}
@Override
public void collect(Map<String, Object> event) {
- adapterPipeline.process(event);
+ try {
+ adapterPipeline.process(event);
+ } catch (RuntimeException e) {
+ runtimeContext.getLogger().error(e);
+ }
}
}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java
index bfe9b56be8..9c01a9549c 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ExtensionsLogger.java
@@ -23,8 +23,14 @@ import
org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
import org.apache.streampipes.model.monitoring.SpLogEntry;
import org.apache.streampipes.model.monitoring.SpLogMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
public class ExtensionsLogger implements IExtensionsLogger {
+ private static final Logger LOG =
LoggerFactory.getLogger(ExtensionsLogger.class);
+
private final String resourceId;
private final SpMonitoringManager monitoringManager;
@@ -42,21 +48,25 @@ public class ExtensionsLogger implements IExtensionsLogger {
@Override
public void error(Exception e) {
+ LOG.error("Error while processing the internal pipeline", e);
log(SpLogMessage.from(e));
}
@Override
public void error(String details, Exception e) {
+ LOG.error("Error while processing the internal pipeline", e);
log(SpLogMessage.from(e, details));
}
@Override
public void info(String title, String details) {
+ LOG.info("Info message while processing the internal pipeline: {}, {}",
title, details);
log(SpLogMessage.info(title, details));
}
@Override
public void warn(String title, String details) {
+ LOG.warn("Warning while processing the internal pipeline: {}, {}", title,
details);
log(SpLogMessage.warn(title, details));
}